In [1]:

# Main.py
import os
import findspark 
findspark.init("/Users/almerstaines/anaconda3/envs/pyspark_env/spark-3.5.5-bin-hadoop3")

## PySpark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, ArrayType, StringType
from pyspark.context import SparkContext
from pyspark.sql.functions import row_number, concat, col, lit, regexp_extract, count, when, isnan, explode, from_json, udf, to_date, split, trim, monotonically_increasing_id, from_unixtime

# Pandas
import pandas as pd

# Staging
from delta.tables import DeltaTable
from delta import configure_spark_with_delta_pip

# Others
import re

# SQL Implementation
import pymysql
import mysql.connector

# Load Data
import json


In [2]:
driver = "/Users/almerstaines/Documents/CineScope/driver/mysql-connector-j-8.0.33/mysql-connector-j-8.0.33.jar"

# builder was configured for delta lake integration into PySpark. 
builder = SparkSession.builder \
    .appName("CineScope") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars", driver) \
    .config("spark.driver.extraClassPath", driver) \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.network.timeout", "800s") \
    .config("spark.rpc.askTimeout", "300s")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

25/04/21 14:02:32 WARN Utils: Your hostname, Almers-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.113.190 instead (on interface en0)
25/04/21 14:02:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/almerstaines/.ivy2/cache
The jars for the packages stored in: /Users/almerstaines/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e6c487fb-5ee1-4c16-b173-1eac6ee4587a;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/almerstaines/anaconda3/envs/pyspark_env/spark-3.5.5-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 134ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.0 from central in [default]
	io.delta#delta-storage;3.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e6c487fb-5ee1-4c16-b173-1eac6ee4587a
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/4ms)
25/04/21 14:02:32

# Loading the Data

The provided dataset contains the following: 

- `movies_main.csv` with 
    - some duplicated entries
    - Mixed date formats
    - Missing budget data

- `ratings.json` with
    - Nested rating summaries
    - Multiple metrics per movie

- `movie_extended.csv` with
    - Multiple genres per movie
    - Production Company Info
    - Country Data

All data were compiled under `data.json`

## Bronze Layer - Delta Lake Integration

Delta Lake is an open-source storage framework that enables building a format agnostic Lakehouse architecture with compute engines including Spark.

Objective: Due to different file formats, it would be better to unify formats into a delta table. 

In [None]:
with open("data.json", "r") as f:
    sources = json.load(f)

for name, config, in sources.items():
    data = spark.read.format(config["format"]).options(**config["options"]).load(config["path"])
    data.write.format("delta").mode("overwrite").save(f"delta_lake/{name}")                                                                    

25/04/09 19:33:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [5]:
movies_main_path = "delta_lake/movies_main" # Path for Movies_Main delta table
movie_extended_path = "delta_lake/movie_extended" # Path for movie_extended delta table
ratings_path = "delta_lake/ratings" # Path for ratings delta table

## Data Quality Check

In [None]:
def data_quality(data, name): 
    print(f"\n-- Data Quality Report for: {name}---")

    # Tables
    print(f"\nDela Table: {name}")
    data.show()
    
    # Schema Check
    print("\nSchema:")
    data.printSchema()

    # Data Count
    data_count = data.count()
    print(f"Data count: {data_count}")
        
    # Null Counts
    nulls = data.select([
        count(when(col(c).isNull(), c)).alias(c)
        for c in data.columns
    ])
    print("Nulls per column:")
    nulls.show(truncate = False)

    # Duplicate Counts
    duplicates = data.count() - data.dropDuplicates().count()
    print(f"Duplicate rows: {duplicates}")

for name, config, in sources.items():
    data = spark.read.format(config["format"]).options(**config["options"]).load(config["path"])
    data_quality(data, name)



-- Data Quality Report for: movie_extended---

Dela Table: movie_extended
+-----+--------------------+--------------------+--------------------+--------------------+
|   id|              genres|production_companies|production_countries|    spoken_languages|
+-----+--------------------+--------------------+--------------------+--------------------+
|  862|Animation,Comedy,...|Pixar Animation S...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
| 8844|Adventure,Fantasy...|TriStar Pictures,...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
|15602|      Romance,Comedy|Warner Bros.,Lanc...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
|31357|Comedy,Drama,Romance|Twentieth Century...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
|11862|              Comedy|Sandollar Product...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
|  949|Action,Crime,Dram...|Regency Enterpris...|[{'iso_3166_1': '...|[{'iso_639_1': 'e...|
|11860|      Comedy,Romance|Paramount Picture...|[{'iso_3166_1': '...|[{'iso_639_1': 'f...|
|4532

## Silver Layer - Transformation through Apache Spark

### Key Observations based on the Data Quality Report
- For `movie_extended`:
    - `production_countries` can be normalized.
    - `spoken_languages` can be normalized.
    - `genres` can be normalized.
    - `production_companies` can be normalized. 

- For `movies_main`:
    - different date formats
    - High null values in budget
    - Null values in release_date
    - Null values in title
    - Null values in revenue. 

- For `ratings`: 
    - `ratings_summary` can be normalized

#### Transformations to `movie_extended` Delta Table

In [None]:
movie_extended = spark.read.format("delta").load(movie_extended_path)

country_schema = ArrayType(StructType([
    StructField("iso_3166_1", StringType(), True), 
    StructField("name", StringType(), True)
]))

language_schema = ArrayType(StructType([
    StructField("iso_639_1", StringType(), True), 
    StructField("name", StringType(), True)
]))

movie_extended = movie_extended.withColumn("production_countries", from_json(col("production_countries"), country_schema))
movie_extended = movie_extended.withColumn("spoken_languages", from_json(col("spoken_languages"), language_schema))

# Exploding JSON cols
movie_extended = movie_extended.withColumn("production_countries", explode(col("production_countries")))
movie_extended = movie_extended.withColumn("spoken_languages", explode(col("spoken_languages")))

movie_extended = movie_extended.select(
    col("id"), col("genres"), col("production_companies"), 
    col("production_countries.iso_3166_1").alias("country_code"), 
    col("production_countries.name").alias("country_name"), 
    col("spoken_languages.iso_639_1").alias("language_code"),
    col("spoken_languages.name").alias("language_name")
)

movie_extended.show(5)

+-----+--------------------+--------------------+------------+--------------------+-------------+-------------+
|   id|              genres|production_companies|country_code|        country_name|language_code|language_name|
+-----+--------------------+--------------------+------------+--------------------+-------------+-------------+
|  862|Animation,Comedy,...|Pixar Animation S...|          US|United States of ...|           en|      English|
| 8844|Adventure,Fantasy...|TriStar Pictures,...|          US|United States of ...|           en|      English|
| 8844|Adventure,Fantasy...|TriStar Pictures,...|          US|United States of ...|           fr|     Français|
|15602|      Romance,Comedy|Warner Bros.,Lanc...|          US|United States of ...|           en|      English|
|31357|Comedy,Drama,Romance|Twentieth Century...|          US|United States of ...|           en|      English|
+-----+--------------------+--------------------+------------+--------------------+-------------+-------

In [None]:
movie_extended.write.format("delta").mode("overwrite").option("overwriteSchema", True).save(movie_extended_path)

#### Transformations for `movie_main` Delta Table

In [9]:
movie_main = spark.read.format("delta").load(movies_main_path)

unique_values = movie_main.select("id").distinct()
movie_main = movie_main.join(unique_values, on = "id", how = "inner")


In [None]:
# Fixing the Data Formats

def date_format(date_str): 
    if date_str is None:
        return "Null"
    patterns = {
        "%Y-%m-%d": r"\d{4}-\d{2}-\d{2}",           # 1995-12-22
        "%Y_%m_%d": r"\d{4}_\d{2}_\d{2}",           # 1995_12_22
        "%d-%m-%Y": r"\d{2}-\d{2}-\d{4}",           # 22-12-1995
        "%m/%d/%Y": r"\d{2}/\d{2}/\d{4}",           # 12/22/1995 
        "%m-%d-%Y": r"\d{2}-\d{2}-\d{4}",           # 12-22-1995
        "%b %d, %Y": r"[A-Za-z]{3} \d{2}, \d{4}",   # Jan 22, 1995
        "%d-%b-%Y": r"\d{2}-[A-Za-z]{3}-\d{4}",     # 22-Jan-1995
        "%Y/%m/%d": r"\d{4}/\d{2}/\d{2}"
    }
    for fmt, pattern, in patterns.items():
        if re.fullmatch(pattern, date_str):
            return fmt
    return "Unknown"

detect_format_udf = udf(date_format, StringType())

format_date = movie_main.withColumn("date_format", detect_format_udf(col("release_date")))
format_date.select("date_format").distinct().show(truncate = False)

+-----------+
|date_format|
+-----------+
|%m/%d/%Y   |
|%Y-%m-%d   |
|%d-%m-%Y   |
|Null       |
+-----------+



                                                                                

In [None]:
parsed = format_date.withColumn(
    "released_date", 
    when(col("date_format") == "%Y-%m-%d", to_date(col("release_date"), "yyyy-MM-dd")) #
    .when(col("date_format") == "%d/%m/%Y", to_date(col("release_date"), "dd/MM/yyyy")) #
    .when(col("date_format") == "%m/%d/%Y", to_date(col("release_date"), "MM/dd/yyyy")) #
)

transformed_main = parsed.drop("release_date", "date_format")
transformed_main.show()

+-----+--------------------+--------+------------+-------------+
|   id|               title|  budget|     revenue|released_date|
+-----+--------------------+--------+------------+-------------+
|  862|           Toy Story|30000000|3.73554033E8|   1995-10-30|
| 8844|             Jumanji|65000000|2.62797249E8|   1995-12-15|
|15602|    Grumpier Old Men|       0|         0.0|   1995-12-22|
|31357|   Waiting to Exhale|       0| 8.1452156E7|   1995-12-22|
|11862|Father of the Bri...|       0| 7.6578911E7|   1995-02-10|
|  949|                Heat|60000000|1.87436818E8|   1995-12-15|
|11860|             Sabrina|58000000|         0.0|   1995-12-15|
|45325|        Tom and Huck|       0|         0.0|   1995-12-22|
| 9091|        Sudden Death|35000000| 6.4350171E7|   1995-12-22|
|  710|           GoldenEye|       0|3.52194034E8|   1995-11-16|
| 9087|The American Pres...|62000000|1.07879496E8|   1995-11-17|
|12110|Dracula: Dead and...|       0|         0.0|         NULL|
|21032|               Bal

In [12]:
transformed_main.write.format("delta").mode("overwrite").option("overwriteSchema", True).save(movies_main_path)

#### Transformations on `ratings` Delta Table

In [13]:
ratings = spark.read.format("delta").load(ratings_path)

ratings_schema = StructType([
    StructField("movie_id", LongType(), True), 
    StructField("last_rated", LongType(), True), 
    StructField("ratings_summary", StructType([
        StructField("avg_rating", DoubleType(), True), 
        StructField("total_ratings", LongType(), True), 
        StructField("std_dev", DoubleType(), True)
    ]))
])

ratings = ratings \
    .withColumn("avg_rating", col("ratings_summary.avg_rating")) \
    .withColumn("total_ratings", col("ratings_summary.total_ratings")) \
    .withColumn("std_dev", col("ratings_summary.std_dev")) \
    .drop("ratings_summary")

ratings.show()

+----------+--------+------------------+-------------+------------------+
|last_rated|movie_id|        avg_rating|total_ratings|           std_dev|
+----------+--------+------------------+-------------+------------------+
|1475783711|       1|3.8724696356275303|          247|0.9589814920649079|
|1470073353|       2|3.4018691588785046|          107|0.8807138301648417|
|1471385241|       3|3.1610169491525424|           59|1.1501149791325278|
|1090908852|       4|2.3846153846153846|           13|0.9388345202485134|
|1471385747|       5| 3.267857142857143|           56|0.9485121636939214|
|1471386884|       6|3.8846153846153846|          104|0.8309275704499349|
|1462639898|       7|3.2830188679245285|           53|1.1029219668778798|
|1049685242|       8|               3.8|            5|1.6431676725154982|
|1016317600|       9|              3.15|           20| 0.812727700887249|
|1471385100|      10|3.4508196721311477|          122|  0.76980687732063|
|1462947988|      11|3.689024390243902

In [None]:
nan_ratings = ratings.select([count(when(isnan(col(c)), c)).alias(c) for c in ratings.columns])
nan_ratings.show()

+----------+--------+----------+-------------+-------+
|last_rated|movie_id|avg_rating|total_ratings|std_dev|
+----------+--------+----------+-------------+-------+
|         0|       0|         0|            0|   3063|
+----------+--------+----------+-------------+-------+



In [15]:
ratings = ratings.withColumn("std_dev", when(isnan("std_dev"), 0).otherwise(col("std_dev")))

In [None]:
nan_ratings = ratings.select([count(when(isnan(col(c)), c)).alias(c) for c in ratings.columns])
nan_ratings.show()

+----------+--------+----------+-------------+-------+
|last_rated|movie_id|avg_rating|total_ratings|std_dev|
+----------+--------+----------+-------------+-------+
|         0|       0|         0|            0|      0|
+----------+--------+----------+-------------+-------+



In [38]:
# Converting last_rated unix timestamp into a readable date. 
ratings = spark.read.format("delta").load(ratings_path)
ratings = ratings.withColumn("last_rated", from_unixtime(col("last_rated"), "yyyy-MM-dd HH:mm:ss")) 

ratings.show()

+-------------------+--------+------------------+-------------+------------------+
|         last_rated|movie_id|        avg_rating|total_ratings|           std_dev|
+-------------------+--------+------------------+-------------+------------------+
|2016-10-07 03:55:11|       1|3.8724696356275303|          247|0.9589814920649079|
|2016-08-02 01:42:33|       2|3.4018691588785046|          107|0.8807138301648417|
|2016-08-17 06:07:21|       3|3.1610169491525424|           59|1.1501149791325278|
|2004-07-27 14:14:12|       4|2.3846153846153846|           13|0.9388345202485134|
|2016-08-17 06:15:47|       5| 3.267857142857143|           56|0.9485121636939214|
|2016-08-17 06:34:44|       6|3.8846153846153846|          104|0.8309275704499349|
|2016-05-08 00:51:38|       7|3.2830188679245285|           53|1.1029219668778798|
|2003-04-07 11:14:02|       8|               3.8|            5|1.6431676725154982|
|2002-03-17 06:26:40|       9|              3.15|           20| 0.812727700887249|
|201

In [39]:
ratings.write.format("delta").mode("overwrite").option("overwriteSchema", True).save(ratings_path)

### Data Quality Check After Transformations

In [40]:
delta_lake = "delta_lake" # delta-lake directory

delta_tables = [name for name in os.listdir(delta_lake) if not name.startswith(".")]

for table in delta_tables:
    path = os.path.join(delta_lake, table)
    data = spark.read.format("delta").load(path)
    data_quality(data, table)


-- Data Quality Report for: movies_main---

Dela Table: movies_main
+-----+--------------------+--------+------------+-------------+
|   id|               title|  budget|     revenue|released_date|
+-----+--------------------+--------+------------+-------------+
|  862|           Toy Story|30000000|3.73554033E8|   1995-10-30|
| 8844|             Jumanji|65000000|2.62797249E8|   1995-12-15|
|15602|    Grumpier Old Men|       0|         0.0|   1995-12-22|
|31357|   Waiting to Exhale|       0| 8.1452156E7|   1995-12-22|
|11862|Father of the Bri...|       0| 7.6578911E7|   1995-02-10|
|  949|                Heat|60000000|1.87436818E8|   1995-12-15|
|11860|             Sabrina|58000000|         0.0|   1995-12-15|
|45325|        Tom and Huck|       0|         0.0|   1995-12-22|
| 9091|        Sudden Death|35000000| 6.4350171E7|   1995-12-22|
|  710|           GoldenEye|       0|3.52194034E8|   1995-11-16|
| 9087|The American Pres...|62000000|1.07879496E8|   1995-11-17|
|12110|Dracula: Dead 

                                                                                

+-------------------+--------+------------------+-------------+------------------+
|         last_rated|movie_id|        avg_rating|total_ratings|           std_dev|
+-------------------+--------+------------------+-------------+------------------+
|2016-10-07 03:55:11|       1|3.8724696356275303|          247|0.9589814920649079|
|2016-08-02 01:42:33|       2|3.4018691588785046|          107|0.8807138301648417|
|2016-08-17 06:07:21|       3|3.1610169491525424|           59|1.1501149791325278|
|2004-07-27 14:14:12|       4|2.3846153846153846|           13|0.9388345202485134|
|2016-08-17 06:15:47|       5| 3.267857142857143|           56|0.9485121636939214|
|2016-08-17 06:34:44|       6|3.8846153846153846|          104|0.8309275704499349|
|2016-05-08 00:51:38|       7|3.2830188679245285|           53|1.1029219668778798|
|2003-04-07 11:14:02|       8|               3.8|            5|1.6431676725154982|
|2002-03-17 06:26:40|       9|              3.15|           20| 0.812727700887249|
|201

## Gold Layer - SQL Integration and Data Modeling

For normalizing delta tables and creating a pipeline through a JDBC driver. 

In [30]:
# Normalizing genres from the movie_extended table
movie_extended = spark.read.format("delta").load(movie_extended_path)
genres = movie_extended.withColumn("genre", explode(split("genres", ","))) \
    .withColumn("Genre", trim("Genre")) \
    .select("id", "Genre")

dim_genres = genres.select("Genre").distinct() \
    .withColumn("GenreID", monotonically_increasing_id())

dim_genres.show()

+---------------+-------+
|          Genre|GenreID|
+---------------+-------+
|          Crime|      0|
|        Romance|      1|
|       TV Movie|      2|
|       Thriller|      3|
|      Adventure|      4|
|        Foreign|      5|
|          Drama|      6|
|            War|      7|
|    Documentary|      8|
|         Family|      9|
|        Fantasy|     10|
|        History|     11|
|        Mystery|     12|
|      Animation|     13|
|          Music|     14|
|Science Fiction|     15|
|         Horror|     16|
|        Western|     17|
|         Comedy|     18|
|         Action|     19|
+---------------+-------+



In [None]:
movie_genres = genres.join(dim_genres, on = "Genre", how = "Inner") \
    .select("id", "GenreID")

movie_genres.count()

145188

In [None]:
# Creating a connection from PySpark to MySQL

jdbc_url = "jdbc:mysql://127.0.0.1:3306/CineScope"
properties = {
    "user": "stratpoint", 
    "password": "stratpoint", 
    "driver": "com.mysql.cj.jdbc.Driver"
}

for table_name in os.listdir(delta_lake):
    table_path = os.path.join(delta_lake, table_name)
    
    if os.path.isdir(table_path):
        try:
            print(f"processing table: {table_name}")
            data = spark.read.format("delta").load(table_path)

            data.write.jdbc(
                url = jdbc_url, 
                table = table_name, 
                mode = "overwrite", 
                properties = properties
            )

            print(f"Delta tables: {table_name} routed successfully to MySQL")
        except Exception as e:
            print(f"{table_name} failed to connect: {e}")
                  

processing table: movies_main


                                                                                

Delta tables: movies_main routed successfully to MySQL
processing table: ratings
Delta tables: ratings routed successfully to MySQL
processing table: movie_extended




Delta tables: movie_extended routed successfully to MySQL


                                                                                

## Gold Layer - Loading to PowerBI

Since I do not have access to PowerBI Desktop, and Microsoft Fabric, thus, I'll utilize microsoft excel




In [42]:
movies_ratings_table = spark.read.jdbc(
    url = jdbc_url,
    table = "(SELECT * FROM ratings) as ratings", 
    properties = properties
)

movies_ratings_table.show(5)

+-------------------+--------+------------------+-------------+------------------+
|         last_rated|movie_id|        avg_rating|total_ratings|           std_dev|
+-------------------+--------+------------------+-------------+------------------+
|2016-10-07 03:55:11|       1|3.8724696356275303|          247|0.9589814920649079|
|2016-08-02 01:42:33|       2|3.4018691588785046|          107|0.8807138301648417|
|2016-08-17 06:07:21|       3|3.1610169491525424|           59|1.1501149791325278|
|2004-07-27 14:14:12|       4|2.3846153846153846|           13|0.9388345202485134|
|2016-08-17 06:15:47|       5| 3.267857142857143|           56|0.9485121636939214|
+-------------------+--------+------------------+-------------+------------------+
only showing top 5 rows



In [43]:
Excel_directory = "Directory/CineScope directory.xlsx"

conn = mysql.connector.connect(
    host = "127.0.0.1", 
    user = "stratpoint", 
    password = "stratpoint", 
    database = "CineScope"
)

cursor = conn.cursor()
cursor.execute("SHOW TABLES")
tables = [row[0] for row in cursor.fetchall()]

with pd.ExcelWriter(Excel_directory, engine = 'openpyxl') as writer:
    for table in tables:
        data = pd.read_sql(f"SELECT * FROM {table}", conn)
        data.to_excel(writer, sheet_name = table[:31], index = False)

print("All tables exported to Excel")

  data = pd.read_sql(f"SELECT * FROM {table}", conn)


All tables exported to Excel


25/04/11 08:41:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1057246 ms exceeds timeout 800000 ms
25/04/11 08:41:39 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/11 08:55:23 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$