Ethan Buckner - Final Project

I used the sample database ‘sakila’, provided by MySQL.

We start by establishing our credentials

In [0]:
%pip install sqlalchemy

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.13-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.7/2.7 MB 36.8 MB/s eta 0:00:00
Collecting greenlet!=0.4.17
  Downloading greenlet-2.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (613 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 613.7/613.7 kB 51.0 MB/s eta 0:00:00
Installing collected packages: greenlet, sqlalchemy
Successfully installed greenlet-2.0.2 sqlalchemy-2.0.13
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import pandas as pd
import pymysql.cursors
import pymongo
from sqlalchemy import create_engine, text

# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds2002ethanbuckner.mysql.database.azure.com"
jdbc_port = 3306
src_database = "sakila"

connection_properties = {
  "user" : "ethan_admin",
  "password" : "King in the north!",
  "driver" : "org.mariadb.jdbc.Driver"
}

mysql_user_id = "ethan_admin"
mysql_pwd = "King in the north!"
mysql_host_name = "ds2002ethanbuckner.mysql.database.azure.com"

atlas_cluster_name = "EthanCluster"
atlas_user_name = "root"
atlas_password = "kinginthenorth"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/?retryWrites=true&w=majority"
}

src_dbname = "sakila"

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_fact;

USE sakila_fact

I decided to drop the column ‘last_update’ from all tables because the data was not necessary and the timestamp data type would be difficult to work with.

In [0]:
conn = pymysql.connect(host=jdbc_hostname, user="ethan_admin", password="King in the north!", database=src_dbname)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("ALTER TABLE actor DROP COLUMN last_update;")
cursor.execute("ALTER TABLE address DROP COLUMN last_update;")
cursor.execute("ALTER TABLE category DROP COLUMN last_update;")
cursor.execute("ALTER TABLE city DROP COLUMN last_update;")
cursor.execute("ALTER TABLE country DROP COLUMN last_update;")
cursor.execute("ALTER TABLE customer DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film_actor DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film_category DROP COLUMN last_update;")
cursor.execute("ALTER TABLE inventory DROP COLUMN last_update;")
cursor.execute("ALTER TABLE language DROP COLUMN last_update;")
cursor.execute("ALTER TABLE payment DROP COLUMN last_update;")
cursor.execute("ALTER TABLE rental DROP COLUMN last_update;")
cursor.execute("ALTER TABLE staff DROP COLUMN last_update;")
cursor.execute("ALTER TABLE store DROP COLUMN last_update;")
cursor.close()
conn.close()

I then executed the queries in sakila-schema.sql and sakila-data.sql using MySQLWorkbench. I also created my destination database: sakila_fact. I used the script provided to create a dim_date table and populate it with dates from 1/1/2001 to 12/31/2024.
I now need to edit all of my tables that include date information (customer, payment, and rental) to store dim_date foreign keys rather than date objects. I decided to use pandas dataframes to do this rather than sql.

In [0]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()

    return dframe

customer_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.customer;")
payment_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.payment;")
rental_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.rental;")

dim_date = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.dim_date;")
dim_date['full_date'] = dim_date['full_date'].astype(str)

customer_df['create_date'] = customer_df['create_date'].astype(str).str[:10]
customer_date = customer_df.merge(dim_date, left_on='create_date', right_on='full_date')
customer_df = customer_df.drop(columns=['create_date'])
customer_df['create_date_id'] = customer_date['date_key']

payment_df['payment_date'] = payment_df['payment_date'].astype(str).str[:10]
payment_date = payment_df.merge(dim_date, left_on='payment_date', right_on='full_date')
payment_df = payment_df.drop(columns=['payment_date'])
payment_df['payment_date_id'] = payment_date['date_key']

rental_df['rental_date'] = rental_df['rental_date'].astype(str).str[:10]
rental_date = rental_df.merge(dim_date, left_on='rental_date', right_on='full_date')
rental_df = rental_df.drop(columns=['rental_date', 'return_date'])
rental_df['rental_date_id'] = rental_date['date_key']

Now we will convert these pandas dataframes to spark dataframes, and save them as tables in our db

In [0]:
s_customer_df = spark.createDataFrame(customer_df)
s_payment_df = spark.createDataFrame(payment_df)
s_rental_df = spark.createDataFrame(rental_df)
s_dim_date_df = spark.createDataFrame(dim_date)

s_customer_df.write.saveAsTable("sakila_fact.dim_customer")
s_payment_df.write.saveAsTable("sakila_fact.dim_payment")
s_rental_df.write.saveAsTable("sakila_fact.dim_rental")
s_dim_date_df.write.saveAsTable("sakila_fact.dim_date")

Next we will extract the tables related to information about the store involved in a transaction. This includes the tables: ‘address’, ‘city’, ‘country’, and ‘store’. We will then store the contents of these tables in a MongoDB database.

In [0]:
from pymongo.server_api import ServerApi

address_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.address;")
address_df = address_df.drop(columns=['location', 'address2'])

city_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.city;")
country_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.country;")
store_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.store;")

client = pymongo.MongoClient(conn_str['atlas'], server_api=ServerApi('1'))
db = client['sakila']
db['address'].insert_many(address_df.to_dict(orient='records'))
db['city'].insert_many(city_df.to_dict(orient='records'))
db['country'].insert_many(country_df.to_dict(orient='records'))
db['store'].insert_many(store_df.to_dict(orient='records'))

In [0]:
s_city_df = spark.createDataFrame(city_df)
s_country_df = spark.createDataFrame(country_df)
s_store_df = spark.createDataFrame(store_df)

Extract the data back from mongodb and add it to our database

In [0]:
mongo_uri = "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/sakila.address?retryWrites=true&w=majority"

df = spark.read.format("mongo").option("uri",mongo_uri).load()

df.write.saveAsTable("sakila_fact.dim_address")

In [0]:
mongo_uri = "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/sakila.city?retryWrites=true&w=majority"

df = spark.read.format("mongo").option("uri",mongo_uri).load()

df.write.saveAsTable("sakila_fact.dim_city")

mongo_uri = "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/sakila.country?retryWrites=true&w=majority"

df = spark.read.format("mongo").option("uri",mongo_uri).load()

df.write.saveAsTable("sakila_fact.dim_country")

mongo_uri = "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/sakila.store?retryWrites=true&w=majority"

df = spark.read.format("mongo").option("uri",mongo_uri).load()

df.write.saveAsTable("sakila_fact.dim_store")

Now I will extract the rental table and store the data in a csv file in a file called rental.csv.

In [0]:
s_rental_df.write.format("csv").option("header","true").save("/FileStore/mydata/rental.csv")

Lets set up a stream for this csv

In [0]:
# Define schema
schema = StructType([
    StructField("rental_id", IntegerType(), True),
    StructField("rental_date_id", IntegerType(), True),
    StructField("inventory_id", IntegerType(), True),  # Assuming mediumint fits within IntegerType
    StructField("customer_id", ShortType(), True),  # smallint maps to ShortType in PySpark
    StructField("staff_id", ByteType(), True),  # tinyint maps to ByteType in PySpark
])

df_stream = spark \
    .readStream \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema) \
    .csv("/FileStore/mydata/rental.csv")


Merge all other dataframes into our streaming dataframe

In [0]:
s_inventory_df = spark.createDataFrame(get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.inventory;"))
s_film_df = spark.createDataFrame(get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.film;"))
s_staff_df = spark.createDataFrame(get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.staff;"))

fact_rentals = df_stream.join(s_payment_df, on=['rental_id', 'staff_id', 'customer_id']).drop("payment_id")
fact_rentals = fact_rentals.join(s_inventory_df, on='inventory_id').drop("inventory_id")
fact_rentals = fact_rentals.join(s_film_df, on='film_id').drop("description", "release_year", "language_id", "original_language_id", "rental_duration", "rental_rate", "length", "replacement_cost", "rating", "special_features").withColumnRenamed("title", "film_title")
fact_rentals = fact_rentals.join(s_customer_df, on=['customer_id', 'store_id']).drop("first_name", "active", "create_date_id").withColumnRenamed("email", "customer_email").withColumnRenamed("address_id", "customer_address_id").withColumnRenamed("last_name", "customer_last_name")
fact_rentals = fact_rentals.join(s_staff_df, on=['staff_id', 'store_id']).drop("address_id", "email", "first_name").withColumnRenamed("last_name", "staff_last_name")

Save our streaming dataframe to the database

In [0]:
query = fact_rentals \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("fact_rentals_table") \
    .start()