In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count, coalesce, lit, current_timestamp
import sqlite3

def incremental_etl():
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("Customer ETL") \
        .config("spark.driver.extraClassPath", "/path/to/sqlite-jdbc-driver.jar") \
        .getOrCreate()

    conn = sqlite3.connect('../../../Customers_ETL.db')
    cursor = conn.cursor()

    try:
        # EXTRACT (Load CSVs using PySpark)
        customers_df = spark.read.csv("../../../Customer.csv", header=True, inferSchema=True)
        invoices_df = spark.read.csv("../../../Invoice.csv", header=True, inferSchema=True)
        invoice_lines_df = spark.read.csv("../../../InvoiceLine.csv", header=True, inferSchema=True)

        # Check if the 'customer_loyalty' table exists in SQLite
        cursor.execute("""
            SELECT name FROM sqlite_master WHERE type='table' AND name='customer_loyalty_and_invoice_size_ETL';
        """)
        table_exists = cursor.fetchone()

        # If the table exists, get the latest processed updated_at
        if table_exists:
            latest_updated_at_query = "SELECT MAX(updated_at) FROM customer_loyalty_and_invoice_size_ETL"
            cursor.execute(latest_updated_at_query)
            last_updated = cursor.fetchone()[0]
        else:
            last_updated = None

        if not last_updated:
            last_updated = '1900-01-01 00:00:00'  # Default to a very old timestamp

        # Convert last_updated to Spark timestamp
        last_updated = spark.sql(f"SELECT TO_TIMESTAMP('{last_updated}')").collect()[0][0]
        print(f"Last Updated: {last_updated}")

        # Filter the data for rows that have been created or updated after the last load
        customers_filtered_df = customers_df.filter(customers_df['UpdatedAt'] > last_updated)
        invoices_filtered_df = invoices_df.filter(invoices_df['UpdatedAt'] > last_updated)
        invoice_lines_filtered_df = invoice_lines_df.filter(invoice_lines_df['UpdatedAt'] > last_updated)

        # Skip if no new data
        if customers_filtered_df.count() == 0 and invoices_filtered_df.count() == 0 and invoice_lines_filtered_df.count() == 0:
            print("No new data to process.")
            return

        # TRANSFORM
        # Calculate total spend per invoice
        invoice_lines_filtered_df = invoice_lines_filtered_df.withColumn(
            'total_spend', col('UnitPrice') * col('Quantity')
        )

        # Join customer, invoice, and invoice line data
        customer_invoices_df = customers_filtered_df.join(invoices_filtered_df, 'CustomerId', how='left')
        customer_invoice_lines_df = customer_invoices_df.join(
            invoice_lines_filtered_df, 'InvoiceId', how='left'
        )

        # If table exists, read historical data from SQLite using Spark's JDBC
        if table_exists:
            historical_data_df = spark.read \
                .format("jdbc") \
                .option("url", "jdbc:sqlite:/path/to/Customers_ETL.db") \
                .option("dbtable", "customer_loyalty_and_invoice_size_ETL") \
                .load()
        else:
            # Create an empty historical DataFrame if table doesn't exist
            historical_data_df = spark.createDataFrame([], schema=customer_invoice_lines_df.schema)

        # Aggregation: Calculating loyalty_score and avg_invoice_size
        customer_summary_df = customer_invoice_lines_df.groupBy(
            'CustomerId', 'FirstName', 'LastName'
        ).agg(
            count('InvoiceId').alias('loyalty_score'),  # Number of invoices
            spark_sum('total_spend').alias('total_spend')  # Total spend
        )

        # Merge with historical data
        transformed_df = customer_summary_df.join(
            historical_data_df, on='CustomerId', how='left'
        )

        # Update loyalty_score and avg_invoice_size
        transformed_df = transformed_df.withColumn(
            'loyalty_score',
            coalesce(col('loyalty_score_x'), lit(0)) + col('loyalty_score')
        ).withColumn(
            'avg_invoice_size',
            col('total_spend') / col('loyalty_score')
        ).withColumn(
            'created_at',
            coalesce(col('created_at'), current_timestamp())
        ).withColumn(
            'updated_at', current_timestamp()
        ).withColumn(
            'updated_by', lit('process:ETL')
        )

        # Final columns for loading into the database
        final_df = transformed_df.select(
            'CustomerId', 'FirstName', 'LastName', 'loyalty_score', 
            'avg_invoice_size', 'created_at', 'updated_at', 'updated_by'
        )

        # LOAD
        # Remove existing rows for customers being updated in the SQLite DB
        cursor.execute("""
            DELETE FROM customer_loyalty_and_invoice_size_ETL
            WHERE CustomerId IN ({})
        """.format(",".join(["?"] * final_df.count())), final_df.select('CustomerId').rdd.flatMap(lambda x: x).collect())

        # Load transformed data into the SQLite database using JDBC
        final_df.write \
            .format("jdbc") \
            .option("url", "jdbc:sqlite:/path/to/Customers_ETL.db") \
            .option("dbtable", "customer_loyalty_and_invoice_size_ETL") \
            .mode('append') \
            .save()

        # Commit the changes to the SQLite database
        conn.commit()

    finally:
        # Close the SQLite connection
        conn.close()
        # Stop the Spark session
        spark.stop()

incremental_etl()


Last Updated: 2024-09-15 01:26:43.642000
Column<'UpdatedAt'>
Column<'UpdatedAt'>
No new data to process.


In [7]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
import sqlite3  # Assuming you're using sqlite3
import pandas as pd

def load():
    # Step 1: Initialize Spark session
    spark = SparkSession.builder.appName("CustomerLoyaltyAndInvoiceSize").getOrCreate()

    try:
        customers = spark.read.csv("../../../Customer.csv", header=True, inferSchema=True)
        invoices = spark.read.csv("../../../Invoice.csv", header=True, inferSchema=True)
        invoice_lines = spark.read.csv("../../../InvoiceLine.csv", header=True, inferSchema=True)

        # Calculate Loyalty Score: Number of purchases (invoices per customer)
        loyalty_score = invoices.groupBy("CustomerId").agg(F.count("*").alias("loyalty_score"))

        # Calculate total amount spent per invoice
        invoice_totals = invoice_lines.groupBy("InvoiceId").agg(F.sum(F.col("UnitPrice") * F.col("Quantity")).alias("total_amount"))

        # Calculate average invoice size per customer by joining with Invoices
        avg_invoice_size = invoices.join(invoice_totals, "InvoiceId") \
            .groupBy("CustomerId") \
            .agg(F.avg("total_amount").alias("avg_invoice_size"))
        
        # Join loyalty score and average invoice size with customers
        result = customers.join(loyalty_score, "CustomerId").join(avg_invoice_size, "CustomerId")

         # Add metadata columns
        result = result.withColumn("created_at", F.current_timestamp()) \
                   .withColumn("updated_at", F.current_timestamp()) \
                   .withColumn("updated_by", F.lit("DailyETL:SL"))
        
        # Select only the desired columns
        final_result = result.select("CustomerId", "FirstName", "LastName", "loyalty_score", "avg_invoice_size", "created_at", "updated_at", "updated_by")
        
        final_df = final_result.toPandas()

        conn = sqlite3.connect('../../../Customers_ETL.db')
        final_df.to_sql('customer_loyalty_and_invoice_size_ETL', conn, if_exists='replace', index=False)
        conn.commit()
        conn.close()
        spark.stop()

    except Exception as e:
        print(f"An error occurred: {e}")
        
    finally:
        # Close the SQLite connection and stop Spark session
        conn.close()
        spark.stop()



load()

In [8]:
conn = sqlite3.connect('../../../Customers_ETL.db')
cursor = conn.cursor()

# Query to check if a row with the given CustomerId exists
check_query = "SELECT * FROM customer_loyalty_and_invoice_size_ETL"

# Execute the query with the specific CustomerId
cursor.execute(check_query)

# Fetch all rows from the result set
results = cursor.fetchall()

# Print each row
for row in results:
    print(row)

(31, 'Martha', 'Silk', 8, 5.3742857142857146, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(53, 'Phil', 'Hughes', 7, 5.3742857142857146, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(34, 'João', 'Fernandes', 7, 5.659999999999999, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(28, 'Julia', 'Barnett', 7, 6.231428571428572, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(27, 'Patrick', 'Gray', 7, 5.374285714285714, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(26, 'Richard', 'Cunningham', 7, 6.802857142857142, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(44, 'Terhi', 'Hämäläinen', 7, 5.945714285714287, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(12, 'Roberto', 'Almeida', 7, 5.3742857142857146, '2024-09-15 01:26:43.642000', '2024-09-15 01:26:43.642000', 'DailyETL:SL')
(22, 'Heather',