In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
import logging
import findspark

builder = (
        SparkSession.builder
        .appName("pyspark-deltaLake-local-example")
        .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
        .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

findspark.init()
findspark.add_packages('mysql:mysql-connector-java:9.2.0')

# Configure logging settings
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)
logger.info( f"running spark v{spark.version}" )

import configparser

# Create a ConfigParser object
config = configparser.ConfigParser()

# Read the .ini file
config.read('settings.ini')  # Replace with the path to your .ini file
# Accessing data from the .ini file
username = config['settings']['username']
password = config['settings']['password']
host = config['settings']['host']
port = config['settings']['port']
schema = config['database']['schema']
logger.info('loaded ini')



2025-02-12 20:14:35,813 - __main__ - INFO - running spark v3.5.4
2025-02-12 20:14:35,815 - __main__ - INFO - loaded ini


In [5]:
# Define the MySQL connection properties
url = f"jdbc:mysql://{host}:{port}/{schema}"  # Replace with your MySQL server details
# print( url ) 
properties = {
    "user": username,  # Replace with your MySQL username
    "password": password,  # Replace with your MySQL password
    "driver": "com.mysql.cj.jdbc.Driver"  # MySQL JDBC driver
}


# Load data from MySQL table into a PySpark DataFrame
df = spark.read.jdbc(url=url, table="flights", properties=properties)

# Show the data
df.show(5)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [14]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView( "flights_merge" )

# setup two dataframes to "merge"
# Show the result of the SQL query
query_sql = "SELECT * FROM flights_merge WHERE month = 1"
first_month = spark.sql(query_sql)
first_month.createOrReplaceTempView( "flights_Jan")
first_month.show(3)

# df.show(5)
query_sql_2 = "SELECT * FROM flights_merge WHERE month <= 2 order by month desc"
second_month = spark.sql(query_sql_2)
second_month.createOrReplaceTempView( "flights_Feb")
second_month.show(3)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [15]:
# one time run 
delta_path = "./bronze/flights"
delta_table_exists = DeltaTable.isDeltaTable(spark, delta_path)

if delta_table_exists:
    logger.info( 'delta exists! skipping...')
    # Load the Delta table
    delta_table = DeltaTable.forPath(spark, delta_path)

    # Convert the Delta table to a DataFrame
    delta_df = delta_table.toDF()
    delta_df.createOrReplaceTempView( "delta_flights")
    query_sql = "SELECT max(month) , max(day) FROM delta_flights "
    delta_results = spark.sql(query_sql)
    
    delta_results.show(3)


else:
    logger.info( f'delta table {delta_path} does not exist, creating')
    # If the Delta table does not exist, create one by writing out the current aggregation
    first_month.write.format('delta').mode('overwrite').save( delta_path )

2025-02-12 20:34:16,093 - __main__ - INFO - delta exists! skipping...


+----------+--------+
|max(month)|max(day)|
+----------+--------+
|         2|      31|
+----------+--------+



In [None]:

delta_path = "./bronze/flights"
delta_table_exists = DeltaTable.isDeltaTable(spark, delta_path)

if delta_table_exists:

    # Load the Delta table
    delta_table = DeltaTable.forPath(spark, delta_path)

    # Perform UPSERT (MERGE)
    # Define the condition for the merge with multiple columns
    merge_condition = """source.month = target.month 
    AND source.day = target.day
    AND source.flight = target.flight
    AND source.tailnum = target.tailnum
    AND source.carrier = target.carrier"""

    # Perform the merge operation
    delta_table.alias("target").merge(
        second_month.alias("source"),
        merge_condition
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute() 

    logger.info( "merge completed")

else:

    logger.error( f" delta table does not exist should have been made in previous cell" )  
    # If the Delta table does not exist, create one by writing out the current aggregation
    # df.write.format('delta').mode('overwrite').save( delta_path )

2025-02-12 20:55:30,414 - __main__ - INFO - merge compelted


In [None]:
delta_table_exists = DeltaTable.isDeltaTable(spark, delta_path)

if delta_table_exists:

    delta_table_df = delta_table.toDF().createOrReplaceTempView( "delta_flights" )
    query_sql = "SELECT max(month) , max(day) FROM delta_flights "
    delta_results = spark.sql(query_sql)
    delta_results.show()

    query_sql = "SELECT count(*) FROM delta_flights group by month"
    delta_results = spark.sql(query_sql)
    delta_results.show()


+----------+--------+
|max(month)|max(day)|
+----------+--------+
|         2|      31|
+----------+--------+

+--------+
|count(1)|
+--------+
|   27004|
|   24951|
+--------+

