Ethan Buckner - Data Project 1

I used the sample database 'sakila', provided by MySQL.

We start by establishing the credentials to a local MySQL server:

In [7]:
import json
import os
import numpy
import pandas as pd
import pymysql.cursors
import pymongo
from sqlalchemy import create_engine, text

mysql_host_name = "localhost"
mysql_host_ip = "127.0.0.1"
mysql_port = "3306"
mysql_user_id = "root"
mysql_pwd = "king in the north"

atlas_cluster_name = "EthanCluster"
atlas_user_name = "root"
atlas_password = "kinginthenorth"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : "mongodb+srv://root:kinginthenorth@ethancluster.tnalhre.mongodb.net/?retryWrites=true&w=majority"
}

src_dbname = "sakila"
dst_dbname = "sakila_fact"

I decided to drop the column 'last_update' from all tables because the data was not necessary and the timestamp data type would be difficult to work with.

In [44]:
conn = pymysql.connect(host=mysql_host_name, user=mysql_user_id, password=mysql_pwd, database=src_dbname)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("ALTER TABLE actor DROP COLUMN last_update;")
cursor.execute("ALTER TABLE address DROP COLUMN last_update;")
cursor.execute("ALTER TABLE category DROP COLUMN last_update;")
cursor.execute("ALTER TABLE city DROP COLUMN last_update;")
cursor.execute("ALTER TABLE country DROP COLUMN last_update;")
cursor.execute("ALTER TABLE customer DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film_actor DROP COLUMN last_update;")
cursor.execute("ALTER TABLE film_category DROP COLUMN last_update;")
cursor.execute("ALTER TABLE inventory DROP COLUMN last_update;")
cursor.execute("ALTER TABLE language DROP COLUMN last_update;")
cursor.execute("ALTER TABLE payment DROP COLUMN last_update;")
cursor.execute("ALTER TABLE rental DROP COLUMN last_update;")
cursor.execute("ALTER TABLE staff DROP COLUMN last_update;")
cursor.execute("ALTER TABLE store DROP COLUMN last_update;")
cursor.close()
conn.close()

I then executed the queries in sakila-schema.sql and sakila-data.sql using MySQLWorkbench. I also created my destination database: sakila_fact. I used the script provided to create a dim_date table and populate it with dates from 1/1/2001 to 12/31/2024.

I now need to edit all of my tables that include date information (customer, payment, and rental) to store dim_date foreign keys rather than date objects. I decided to use pandas dataframes to do this rather than sql.

In [8]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()

    return dframe

customer_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.customer;")
payment_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.payment;")
rental_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.rental;")

dim_date = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, "SELECT * FROM sakila_fact.dim_date;")
dim_date['full_date'] = dim_date['full_date'].astype(str)

customer_df['create_date'] = customer_df['create_date'].astype(str).str[:10]
customer_date = customer_df.merge(dim_date, left_on='create_date', right_on='full_date')
customer_df = customer_df.drop(columns=['create_date'])
customer_df['create_date_id'] = customer_date['date_key']

payment_df['payment_date'] = payment_df['payment_date'].astype(str).str[:10]
payment_date = payment_df.merge(dim_date, left_on='payment_date', right_on='full_date')
payment_df = payment_df.drop(columns=['payment_date'])
payment_df['payment_date_id'] = payment_date['date_key']

rental_df['rental_date'] = rental_df['rental_date'].astype(str).str[:10]
rental_date = rental_df.merge(dim_date, left_on='rental_date', right_on='full_date')
rental_df = rental_df.drop(columns=['rental_date', 'return_date'])
rental_df['rental_date_id'] = rental_date['date_key']

Next we will extract the tables related to information about the store involved in a transaction. This includes the tables: 'address', 'city', 'country', and 'store'. We will then store the contents of these tables in a MongoDB database.

In [9]:
from pymongo.server_api import ServerApi

address_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.address;")
address_df = address_df.drop(columns=['location', 'address2'])

city_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.city;")
country_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.country;")
store_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.store;")

client = pymongo.MongoClient(conn_str['atlas'], server_api=ServerApi('1'))
db = client['sakila']
db['address'].insert_many(address_df.to_dict(orient='records'))
db['city'].insert_many(city_df.to_dict(orient='records'))
db['country'].insert_many(country_df.to_dict(orient='records'))
db['store'].insert_many(store_df.to_dict(orient='records'))

<pymongo.results.InsertManyResult at 0x127511d60>

Now I will extract the rental table and store the data in a csv file in a file called rental.csv.

In [114]:
rental_df.to_csv('rental.csv', index=False)

Now I have data from three of the different data sources required, pulling tables from MySQL, MongoDB, and a local csv file. Now I will put all the relevant tables into dataframes, create a fact dataframe for rentals, then save the dim tables and fact table to the sakila_fact MySQL db.

In [10]:
# MongoDB tables
address_df = pd.DataFrame(list(db['address'].find()))
city_df = pd.DataFrame(list(db['city'].find()))
country_df = pd.DataFrame(list(db['country'].find()))
store_df = pd.DataFrame(list(db['store'].find()))

# Drop ids made by MongoDB
address_df = address_df.drop(columns=["_id"])
city_df = city_df.drop(columns=["_id"])
country_df = country_df.drop(columns=["_id"])
store_df = store_df.drop(columns=["_id"])

# CSV table
rental_df = pd.read_csv('rental.csv')

# MySQL tables
# Already have customer_df and payment_df
inventory_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.inventory;")
film_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT * FROM sakila.film;")
staff_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, src_dbname, "SELECT staff_id, first_name, last_name, address_id, email, store_id FROM sakila.staff;")

fact_rentals = payment_df.merge(rental_df, on=['rental_id', 'staff_id', 'customer_id']).drop(columns=["payment_id"]) # No need to store these ids because all data is now in fact table
fact_rentals = fact_rentals.merge(inventory_df, on='inventory_id').drop(columns=["inventory_id"])
fact_rentals = fact_rentals.merge(film_df, on='film_id').drop(columns=["description", "release_year", "language_id", "original_language_id", "rental_duration", "rental_rate", "length", "replacement_cost", "rating", "special_features"]).rename(columns={"title": "film_title"})
fact_rentals = fact_rentals.merge(customer_df, on=['customer_id', 'store_id']).drop(columns=["first_name", "active", "create_date_id"]).rename(columns={"email": "customer_email", "address_id": "customer_address_id", "last_name": "customer_last_name"})
fact_rentals = fact_rentals.merge(address_df, left_on='customer_address_id', right_on='address_id').drop(columns=["district", "postal_code", "phone", "address_id"])
fact_rentals = fact_rentals.merge(city_df, on='city_id')
fact_rentals = fact_rentals.merge(country_df, on='country_id').drop(columns=['city_id', 'country_id']).rename(columns={'address': 'customer_address', 'city': 'customer_city', 'country': 'customer_country'})
fact_rentals = fact_rentals.merge(staff_df, on=['staff_id', 'store_id']).drop(columns=['address_id', 'email', 'first_name']).rename(columns={'last_name': 'staff_last_name'})

reordered = ['film_title', 'amount', 'staff_last_name', 'customer_last_name', 'customer_email', 'customer_address', 'customer_city', 'customer_country', 'rental_id', 'customer_id', 'customer_address_id', 'staff_id', 'film_id', 'store_id', 'payment_date_id', 'rental_date_id']

fact_rentals = fact_rentals[reordered]
fact_rentals.reset_index(inplace=True)
fact_rentals = fact_rentals.rename(columns={"index": "order_pk"})
dim_address = address_df.drop(columns=["phone"])
dim_city = city_df
dim_country = country_df
dim_store = store_df
dim_staff = staff_df
dim_film = film_df
dim_inventory = inventory_df
dim_customer = customer_df

print(fact_rentals)

      order_pk           film_title  amount staff_last_name  \
0            0   GALAXY SWEETHEARTS    4.99        Stephens   
1            1  LIBERTY MAGNIFICENT    8.99        Stephens   
2            2    NECKLACE OUTBREAK    5.99        Stephens   
3            3        CLERKS ANGELS    6.99        Stephens   
4            4      HOLLOW JEOPARDY    4.99        Stephens   
...        ...                  ...     ...             ...   
2014      2014    CONTACT ANONYMOUS    2.99         Hillyer   
2015      2015     WASTELAND DIVINE    2.99         Hillyer   
2016      2016           LION UNCUT    3.99         Hillyer   
2017      2017      HIGHBALL POTTER    0.99         Hillyer   
2018      2018            TAXI KICK    0.99         Hillyer   

     customer_last_name                      customer_email  \
0                  LONG  JACQUELINE.LONG@sakilacustomer.org   
1                  LONG  JACQUELINE.LONG@sakilacustomer.org   
2                  LONG  JACQUELINE.LONG@sakilacustome

Now that we have our fact and dimension tables, lets save them to our sakila_fact database.

In [11]:
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    with sqlEngine.connect() as conn:
        if db_operation == "insert":
            df.to_sql(table_name, con=conn, index=False, if_exists='replace')
            conn.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))

        elif db_operation == "update":
            df.to_sql(table_name, con=conn, index=False, if_exists='append')

set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, fact_rentals, "fact_rentals", "order_pk", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_address, "dim_address", "address_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_city, "dim_city", "city_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_country, "dim_country", "country_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_store, "dim_store", "store_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_staff, "dim_staff", "staff_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_film, "dim_film", "film_id", "insert")
set_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, dim_inventory, "dim_inventory", "inventory_id", "insert")

Proof of functionality: Get total amount spent on rentals for each customer email. Get all rentals with less than an R rating checked out by staff with the first name of Mike

In [14]:
spent_sql = "SELECT customer_email, SUM(amount) AS total_spent FROM sakila_fact.fact_rentals GROUP BY customer_email ORDER BY total_spent;"

total_spent = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, spent_sql)

print(total_spent)

mike_family_sql = "SELECT fact_rentals.film_title, fact_rentals.amount, dim_staff.first_name, dim_film.rating FROM sakila_fact.fact_rentals JOIN sakila_fact.dim_staff ON sakila_fact.fact_rentals.staff_id = sakila_fact.dim_staff.staff_id JOIN sakila_fact.dim_film ON sakila_fact.fact_rentals.film_id = sakila_fact.dim_film.film_id WHERE dim_film.rating != 'R' AND dim_staff.first_name = 'Mike';"

mike_family_df = get_dataframe(mysql_user_id, mysql_pwd, mysql_host_name, dst_dbname, mike_family_sql)

print(mike_family_df)

              film_title  amount first_name rating
0      FELLOWSHIP AUTUMN    4.99       Mike  NC-17
1          VAMPIRE WHALE    7.99       Mike  NC-17
2         SWEDEN SHINING    7.99       Mike     PG
3    LEAGUE HELLFIGHTERS    8.99       Mike  PG-13
4    COLDBLOODED DARLING    5.99       Mike      G
..                   ...     ...        ...    ...
841       IMAGE PRINCESS    8.99       Mike  PG-13
842    CONTACT ANONYMOUS    2.99       Mike  PG-13
843     WASTELAND DIVINE    2.99       Mike     PG
844           LION UNCUT    3.99       Mike     PG
845            TAXI KICK    0.99       Mike  PG-13

[846 rows x 4 columns]
