# Jupyter + Spark + MySQL Challenge

In [2]:
!pip install delta-spark

Collecting delta-spark
  Downloading delta_spark-3.3.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.3.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.3.0


Initializing a Spark session with the Delta Spark session extension. This configuration is essential for working with Delta Lake tables and will be necessary later when saving tables locally in Delta format.

In [3]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e111d04e-bd36-4851-981c-b41a6a514d25;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.0/delta-spark_2.12-3.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.3.0!delta-spark_2.12.jar (1432ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/3.3.0/delta-storage-3.3.0.jar ...
	[SUCCESSFUL ] io.delta#delta-storage;3.3.0!delta-storage.jar (271ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.9.3/antlr4-runtime-4.9.3.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.9.3!antlr4-runtime.jar (280ms)
:: resolution report :: resolve 4277ms :

Establishing connection to MySQL database and retrieving a list of all available tables in the `test_db` database:

In [8]:
database_url = "jdbc:mysql://localhost:3306/test_db?useSSL=false&allowPublicKeyRetrieval=true"

database_properties = {
    "user":  "jovyan",
    "password": "password",
    "driver": "com.mysql.jdbc.Driver"
}

In [9]:
df = spark.read.jdbc(url=database_url, table="INFORMATION_SCHEMA.TABLES", properties=database_properties)
df.createOrReplaceTempView("tables_metadatas")

spark.sql("""
    SELECT TABLE_NAME 
    FROM tables_metadatas
    WHERE TABLE_SCHEMA = 'test_db'
""").show()

+-----------+
| TABLE_NAME|
+-----------+
|order_items|
|     orders|
|   products|
|      users|
+-----------+



Creating function that automates the process of:
1. **Creating a DataFrame:** Reads data from the specified table.
2. **Creating a Temporary View:** Registers the DataFrame as a temporary view for SQL queries.
3. **Displaying Table Output:** Shows the contents of the table directly.

Simply provide the table name as input to use this function efficiently.

In [10]:
def create_and_show_view(table):
    df =  spark.read.jdbc(url=database_url, table=table, properties=database_properties)
    df.createOrReplaceTempView(table)
    spark.sql(f"select * from {table}").show()

In [11]:
create_and_show_view("order_items")

+---+--------+----------+--------+
| id|order_id|product_id|quantity|
+---+--------+----------+--------+
|  1|       1|         1|       1|
|  2|       1|         3|       2|
|  3|       2|         2|       1|
|  4|       3|         4|       1|
|  5|       4|         5|       3|
|  6|       5|         1|       2|
|  7|       6|         3|       1|
+---+--------+----------+--------+



In [12]:
create_and_show_view("orders")

+---+-------+----------+------------+
| id|user_id|order_date|total_amount|
+---+-------+----------+------------+
|  1|      1|2024-01-10|      100.50|
|  2|      1|2024-02-15|      200.75|
|  3|      2|2024-01-05|       50.00|
|  4|      3|2024-03-01|      150.00|
|  5|      4|2024-02-20|      300.99|
|  6|      5|2024-01-25|       75.25|
+---+-------+----------+------------+



In [13]:
create_and_show_view("products")

+---+----------+------+
| id|      name| price|
+---+----------+------+
|  1|    Laptop|999.99|
|  2|Smartphone|599.99|
|  3|Headphones| 79.99|
|  4|  Keyboard| 49.99|
|  5|     Mouse| 39.99|
+---+----------+------+



In [14]:
create_and_show_view("users")

+---+-------+---+-------------------+
| id|   name|age|              email|
+---+-------+---+-------------------+
|  1|  Alice| 30|  alice@example.com|
|  2|    Bob| 25|    bob@example.com|
|  3|Charlie| 35|charlie@example.com|
|  4|  David| 40|  david@example.com|
|  5|    Eve| 28|    eve@example.com|
+---+-------+---+-------------------+



Creating query that leverages four interconnected tables to accurately determine how much each user spent. By appropriately joining these tables, we can track user purchases, item quantities, and product prices to calculate total spending per user.

In [16]:
spark.sql("""
    select 
         users.name
        ,orders.order_date
        ,order_items.quantity as order_item_quantity
        ,products.name as product_name
        ,orders.total_amount as order_total_amount
        ,products.price as product_price
        ,products.price * order_items.quantity total_price
    from users
    left join orders
        on users.id = orders.user_id
    left join order_items
        on orders.id = order_items.order_id
    left join products
        on order_items.product_id = products.id
    order by name, order_id
    """).show()

+-------+----------+-------------------+------------+------------------+-------------+-----------+
|   name|order_date|order_item_quantity|product_name|order_total_amount|product_price|total_price|
+-------+----------+-------------------+------------+------------------+-------------+-----------+
|  Alice|2024-01-10|                  2|  Headphones|            100.50|        79.99|     159.98|
|  Alice|2024-01-10|                  1|      Laptop|            100.50|       999.99|     999.99|
|  Alice|2024-02-15|                  1|  Smartphone|            200.75|       599.99|     599.99|
|    Bob|2024-01-05|                  1|    Keyboard|             50.00|        49.99|      49.99|
|Charlie|2024-03-01|                  3|       Mouse|            150.00|        39.99|     119.97|
|  David|2024-02-20|                  2|      Laptop|            300.99|       999.99|    1999.98|
|    Eve|2024-01-25|                  1|  Headphones|             75.25|        79.99|      79.99|
+-------+-

### Data Analysis

After performing the SQL joins, we obtained the output shown above. However, an inconsistency in the data was identified. The **total price of each product purchased by the user** is calculated by multiplying the `price` from the `products` table by the `quantity` from the `order_items` table. This calculation is shown in the `total_price` column.

The inconsistency arises when comparing this calculated `total_price` to the `order_total_amount` column, which comes from the `orders` table. As you can see, the values in `order_total_amount` show **significant discrepancies** compared to the calculated `total_price` and the individual product prices. This suggests one of the following:

1. **Incorrect Data Population**: The `order_total_amount` values in the `orders` table may have been incorrectly populated.
2. **Different Meaning**: The `order_total_amount` might represent something other than the sum of product prices, such as:
   - **Discounts**: The total amount after applying discounts.
   - **Taxes**: The total amount including taxes or fees.
   - **Historical Prices**: The product prices at the time of purchase, which may differ from the current prices in the `products` table.

To resolve this inconsistency, further clarification is needed on what the `order_total_amount` column represents, and the data may need to be corrected or adjusted accordingly.

Creating dataframe with query using the same joins used previously to link the order_items, orders, products, and users tables. However, this time we are also using the SUM function to calculate the total amount spent by each user.

In [18]:
results_df = spark.sql("""
    select 
         users.id as user_id
        ,users.name
        ,sum(products.price * order_items.quantity) as total_spent
    from order_items
    left join orders
        on order_items.order_id = orders.id
    left join products
        on products.id = order_items.product_id
    left join users
        on users.id = orders.user_id
    group by users.name, users.id
    order by name
  """)

results_df.show()

+-------+-------+-----------+
|user_id|   name|total_spent|
+-------+-------+-----------+
|      1|  Alice|    1759.96|
|      2|    Bob|      49.99|
|      3|Charlie|     119.97|
|      4|  David|    1999.98|
|      5|    Eve|      79.99|
+-------+-------+-----------+



Writing the results from the `results_df` DataFrame to a MySQL database and showing the output:

In [22]:
results_df.write.jdbc(url=database_url, table="Results", mode="overwrite", properties=database_properties)

In [23]:
spark.sql("""
    SELECT TABLE_NAME 
    FROM tables_metadatas
    WHERE TABLE_SCHEMA = 'test_db'
""").show()

+-----------+
| TABLE_NAME|
+-----------+
|    Results|
|order_items|
|     orders|
|   products|
|      users|
+-----------+



In [24]:
create_and_show_view("Results")

+-------+-------+-----------+
|user_id|   name|total_spent|
+-------+-------+-----------+
|      1|  Alice|    1759.96|
|      2|    Bob|      49.99|
|      3|Charlie|     119.97|
|      4|  David|    1999.98|
|      5|    Eve|      79.99|
+-------+-------+-----------+



Writing the `results_df` DataFrame to local disk as a Delta table:

In [25]:
delta_table_path = "/home/jovyan/spark-delta-table"
results_df.write.format("delta").mode("overwrite").save(delta_table_path)

25/02/06 12:35:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                