In [1]:
!pyspark --version

!pip install delta-spark==3.3.0

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.4
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.25
Branch HEAD
Compiled by user yangjie01 on 2024-12-17T04:51:46Z
Revision a6f220d951742f4074b37772485ee0ec7a774e7d
Url https://github.com/apache/spark
Type --help for more information.
Collecting delta-spark==3.3.0
  Downloading delta_spark-3.3.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.3.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.3.0


In [2]:
from pyspark.sql import SparkSession
from delta import *

def read_mysql_table(spark, db_url, user, password, query):
    table = spark.read \
    .format("jdbc") \
    .option("url", db_url) \
    .option("user", user) \
    .option("password", password) \
    .option("query", query) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()
    return table

my_packages = ["mysql:mysql-connector-java:8.0.33"]

builder = (
    SparkSession.builder
    .appName("DE Challenge")
    .config('spark.master','local[*]')
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )

spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ab4e0453-844f-419b-9cfb-4bc4475b1158;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
mysql#mysql-connector-java;8.0.33 is relocated to com.mysql#mysql-connector-j;8.0.33. Please update your dependencies.
	found mysql#mysql-connector-java;8.0.33 in central
	found com.mysql#mysql-connector-j;8.0.33 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.0/delta-spark_2.12-3.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.3.0!delta-spark_2.12.jar (1355ms)
downloading https://repo1.maven.org/maven2/io/delta/de

In [3]:
database_url = "jdbc:mysql://localhost:3306/test_db?useSSL=false&allowPublicKeyRetrieval=true"
user = "jovyan"
password = "password"

tables = read_mysql_table(spark, database_url, user, password, "SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = 'test_db'")

tables.show()

+-----------+
| TABLE_NAME|
+-----------+
|order_items|
|     orders|
|   products|
|      users|
+-----------+



In [4]:
order_items = read_mysql_table(spark, database_url, user, password, "SELECT * FROM test_db.order_items")
orders = read_mysql_table(spark, database_url, user, password, "SELECT * FROM test_db.orders")
products = read_mysql_table(spark, database_url, user, password, "SELECT * FROM test_db.products")
users = read_mysql_table(spark, database_url, user, password, "SELECT * FROM test_db.users")

# order_items.show()
# orders.show()
# products.show()
# users.show()

In [5]:
order_items.createOrReplaceTempView("order_items")
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")
users.createOrReplaceTempView("users")

query = """WITH expenditure_by_user AS (
    SELECT 
            u.name AS user_name
        ,   SUM(oi.quantity * p.price) AS total_spent
        ,   SUM(o.total_amount) AS total_amount
    FROM order_items oi
    JOIN orders o 
        ON oi.order_id = o.id
    JOIN users u 
        ON o.user_id = u.id
    JOIN products p 
        ON oi.product_id = p.id
    GROUP BY u.name
    ORDER BY total_spent DESC
)

SELECT 
    user_name, 
    total_spent, 
    total_amount, 
    total_spent - total_amount AS total_diff 
FROM expenditure_by_user
"""

results = spark.sql(query)

results.write \
    .format("jdbc") \
    .option("url", database_url) \
    .option("dbtable", "test_db.results") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

read_mysql_table(spark, database_url, user, password, "SELECT * FROM test_db.results").show()

+---------+-----------+------------+----------+
|user_name|total_spent|total_amount|total_diff|
+---------+-----------+------------+----------+
|    David|    1999.98|      300.99|   1698.99|
|    Alice|    1759.96|      401.75|   1358.21|
|  Charlie|     119.97|      150.00|    -30.03|
|      Eve|      79.99|       75.25|      4.74|
|      Bob|      49.99|       50.00|     -0.01|
+---------+-----------+------------+----------+



In [6]:
results.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("results")
delta_results = spark.read.format("delta").load("results")
delta_results.show()

25/02/06 12:00:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------+-----------+------------+----------+
|user_name|total_spent|total_amount|total_diff|
+---------+-----------+------------+----------+
|    David|    1999.98|      300.99|   1698.99|
|    Alice|    1759.96|      401.75|   1358.21|
|  Charlie|     119.97|      150.00|    -30.03|
|      Eve|      79.99|       75.25|      4.74|
|      Bob|      49.99|       50.00|     -0.01|
+---------+-----------+------------+----------+

