## Import the Necessary Libraries

In [80]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [82]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


## Declare & Assign Connection Variables for MongoDB Server, MySQL Server & Databases

In [84]:
mysql_args = {
    "uid" : "root",
    "pwd" : "evie6044",
    "hostname" : "localhost",
    "dbname" : "sakila_dw"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "nhk9hb",
    "password" : "B3ZgCYqqw3RFNFCq",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "mnm8h",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_dw"
}

## Define Functions for Getting Data From and Setting Data Into Databases

In [86]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

## Create the New Data Warehouse (sakila_dw) and Switch the Connection Context.

In [88]:
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_args['dbname']}`;"))
connection.execute(text(f"CREATE DATABASE `{mysql_args['dbname']}`;"))
connection.execute(text(f"USE {mysql_args['dbname']};"))

connection.close()

## Create and populate the Date Dimension Table (done in mySQL Server)

#### Get the Data from the Date Dimension Table

In [92]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


## Load Film Dimension Data from a Comma-Separated Values (CSV) File on local file system

In [94]:
data_dir = os.path.join(os.getcwd())
data_file = os.path.join(data_dir, 'sakila_films.csv')

df_films = pd.read_csv(data_file, header=0)
df_films.head()

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 05:03:42
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 05:03:42


#### Perform Any Necessary Transformations to the DataFrame

In [96]:
# drop empty columns
df_films.drop(['original_language_id','language_id'], axis=1, inplace=True)
df_films.head(2)

# insert new column with ever-incrementing numeric value to serve as primary key
df_films.insert(0, "film_key", range(1, df_films.shape[0]+1))
df_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


#### Lookup the Last_Update Date Key from the Date Dimension Table

In [98]:
# Lookup the Primary Key (date_key) that Corresponds to the "last_update" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_films.last_update = df_films.last_update.astype('datetime64[ns]').dt.date
df_films = pd.merge(df_films, df_dim_last_update, on='last_update', how='left')
df_films.drop(['last_update'], axis=1, inplace=True)
df_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


#### Write back to SQL database as new dimension table

In [100]:
dataframe = df_films
table_name = 'dim_films'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validate that the New Dimension Table was Created

In [102]:
sql_films = "SELECT * FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(sql_films, **mysql_args)
df_dim_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


## Populate MongoDB with Source Data (JSON files)
#### For customer dimension -- has been joined with address entity in mySQL server before exporting to JSON) 

In [104]:
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd())

json_files = {"customer" : 'sakila_customer.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)     

#### Extract Data from the Source MongoDB Collections Into DataFrames

In [106]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,active,create_date,last_update,address,address2,district,city_id,postal_code,phone
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1913 Hanoi Way,,Nagasaki,463,35200,28303384290
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1121 Loja Avenue,,California,449,17886,838635286649


#### Perform Any Necessary Transformations to the DataFrame

In [108]:
# drop unnecessary/empty columns
df_customer.drop(['address2','store_id','city_id'], axis=1, inplace=True)

# insert new column with ever-incrementing numeric value to serve as primary key
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,create_date,last_update,address,district,postal_code,phone
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1913 Hanoi Way,Nagasaki,35200,28303384290
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-14 22:04:36,2006-02-15 04:57:20,1121 Loja Avenue,California,17886,838635286649


#### Lookup the Last_Update and Create_Date Date Keys from the Date Dimension Table

In [110]:
# CUSTOMER TABLE
# Lookup the Primary Key (date_key) that Corresponds to the "create_date" Column.
df_dim_create_date = df_dim_date.rename(columns={"date_key" : "create_date_key", "full_date" : "create_date"})
df_customer.create_date = df_customer.create_date.astype('datetime64[ns]').dt.date
df_customer = pd.merge(df_customer, df_dim_create_date, on='create_date', how='left')
df_customer.drop(['create_date'], axis=1, inplace=True)

# Lookup the Primary Key (date_key) that Corresponds to the "last_update" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_customer.last_update = df_customer.last_update.astype('datetime64[ns]').dt.date
df_customer = pd.merge(df_customer, df_dim_last_update, on='last_update', how='left')
df_customer.drop(['last_update'], axis=1, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,address,district,postal_code,phone,create_date_key,last_update_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1913 Hanoi Way,Nagasaki,35200,28303384290,20060214,20060215
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,1121 Loja Avenue,California,17886,838635286649,20060214,20060215


#### Load Newly Transformed MongoDB Data into the sakila_dw Data Warehouse

In [112]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validate that the New dimension Tables were Created

In [114]:
sql_dim_customer = "SELECT * FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email,active,address,district,postal_code,phone,create_date_key,last_update_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,1913 Hanoi Way,Nagasaki,35200,28303384290,20060214,20060215
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,1,1121 Loja Avenue,California,17886,838635286649,20060214,20060215


## Read staff entity data from source database

In [116]:
sql_dim_staff = "SELECT * FROM sakila.staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


#### Perform Any Necessary Transformations to the Dataframe

In [118]:
# drop unnecessary columns
df_dim_staff.drop(['picture','address_id','password','last_update','store_id'], axis=1, inplace=True)

# insert new column with ever-incrementing numeric value to serve as primary key
df_dim_staff.insert(0, "staff_key", range(1, df_dim_staff.shape[0]+1))
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,username
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon


#### Load Data back to new sakila_dw warehouse

In [120]:
dataframe = df_dim_staff
table_name = 'dim_staff'
primary_key = 'staff_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [122]:
# Validate the correctness of the new dimension table
sql_dim_staff = "SELECT * FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,email,active,username
0,1,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike
1,2,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,1,Jon


## Create the Fact Table

#### Read data from the transaction table in your source database
##### This requires combining inventory and rental data

In [124]:
sql_fact_rentals = """SELECT i.film_id, 
       r.*
FROM sakila.inventory AS i
RIGHT OUTER JOIN sakila.rental AS r
    ON i.inventory_id = r.inventory_id;"""

df_fact_rentals = get_sql_dataframe(sql_fact_rentals, **mysql_args)
df_fact_rentals.head(2)

Unnamed: 0,film_id,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,80,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,333,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


## Perform Lookup Operations to replace business codes in your fact dataframe with the corresponding Surrogate Primary Keys from each of the dimension tables

#### Lookup Datekeys for rental fact table from Date Dimension Table

In [126]:
# Lookup the Primary Key (date_key) that Corresponds to the "rental_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,film_id,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,80,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,333,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [128]:
# Lookup the Primary Key (date_key) that Corresponds to the "return_date" Column.
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,film_id,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,80,1,367,130,1,2006-02-15 21:30:53,20050524,20050526.0
1,333,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528.0


#### Lookup the Primary Keys from the Dimension Tables and establish foreign key relationships between fact and dimension tables

##### First, fetch the Surrogate Primary Key and the Business Key from each Dimension table.

In [130]:
# Extract the 'primary key' and the 'business key' from new "dim_customer" dimension table
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [132]:
# Extract the 'primary key' and the 'business key' from new "dim_staff" dimension table
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


In [134]:
# Extract the 'primary key' and the 'business key' from new "dim_films" dimension table
sql_dim_films = "SELECT film_key, film_id FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(sql_dim_films, **mysql_args)
df_dim_films.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


#### Use Business Keys to lookup corresponding Surrogate Primary Keys in the Dimension tables

#### link *fact_rentals* to *dim_customer*
##### rental fact table contains reference to *dim_customer* table by way of the *customer_id* column; lookup the corresponding *customer_key*

In [136]:
# Merge the "fact_rentals" and "dim_customer" dataframes on the 'customer_id' to
# get the 'customer_key', Then drop the 'customer_id' column and display the results.
df_fact_rentals = pd.merge(df_fact_rentals,df_dim_customer,on='customer_id',how='left')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,film_id,rental_id,inventory_id,staff_id,last_update,rental_date_key,return_date_key,customer_key
0,80,1,367,1,2006-02-15 21:30:53,20050524,20050526.0,130
1,333,2,1525,1,2006-02-15 21:30:53,20050524,20050528.0,459


#### link *fact_rentals* to *dim_staff*
##### rental fact table contains reference to *dim_staff* table by way of the *staff_id* column; lookup the corresponding *staff_key*

In [138]:
df_fact_rentals = pd.merge(df_fact_rentals,df_dim_staff,on='staff_id',how='left')
df_fact_rentals.drop(['staff_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,film_id,rental_id,inventory_id,last_update,rental_date_key,return_date_key,customer_key,staff_key
0,80,1,367,2006-02-15 21:30:53,20050524,20050526.0,130,1
1,333,2,1525,2006-02-15 21:30:53,20050524,20050528.0,459,1


#### link *fact_rentals* to *dim_films*
##### rental fact table contains reference to *dim_films* table by way of the *film_id* column; lookup the corresponding *film_key*

In [140]:
df_fact_rentals = pd.merge(df_fact_rentals,df_dim_films,on='film_id',how='left')
df_fact_rentals.drop(['film_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,last_update,rental_date_key,return_date_key,customer_key,staff_key,film_key
0,1,367,2006-02-15 21:30:53,20050524,20050526.0,130,1,80
1,2,1525,2006-02-15 21:30:53,20050524,20050528.0,459,1,333


#### Make any additional transformations to the fact table

In [142]:
# Insert a new 'rental_key' column, with an ever-incrementing numeric value, to serve as the surrogate primary key
df_fact_rentals.insert(0, "rental_key", range(1, df_fact_rentals.shape[0]+1))
df_fact_rentals.head(2)

Unnamed: 0,rental_key,rental_id,inventory_id,last_update,rental_date_key,return_date_key,customer_key,staff_key,film_key
0,1,1,367,2006-02-15 21:30:53,20050524,20050526.0,130,1,80
1,2,2,1525,2006-02-15 21:30:53,20050524,20050528.0,459,1,333


In [144]:
# Drop unnecessary columns
df_fact_rentals.drop(['last_update'], axis=1, inplace=True)

# Reorder the Columns
ordered_columns = ['rental_key','rental_id'
                   ,'inventory_id','film_key','staff_key','customer_key'
                   ,'rental_date_key','return_date_key']

df_fact_rentals = df_fact_rentals[ordered_columns]
df_fact_rentals.head(2)

Unnamed: 0,rental_key,rental_id,inventory_id,film_key,staff_key,customer_key,rental_date_key,return_date_key
0,1,1,367,80,1,130,20050524,20050526.0
1,2,2,1525,333,1,459,20050524,20050528.0


## Load Newly Transformed MongoDB Data into the sakila_dw Data Warehouse

In [146]:
dataframe = df_fact_rentals
table_name = 'fact_rentals'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [148]:
# Verify that new fact table was created
sql_fact_rentals = "SELECT * FROM sakila_dw.fact_rentals;"
df_fact_rentals = get_sql_dataframe(sql_fact_rentals, **mysql_args)
df_fact_rentals.head(2)

Unnamed: 0,rental_key,rental_id,inventory_id,film_key,staff_key,customer_key,rental_date_key,return_date_key
0,1,1,367,80,1,130,20050524,20050526.0
1,2,2,1525,333,1,459,20050524,20050528.0


## Query the new Data Warehouse 
#### Statement that returns rental counts for each film grouped by staff member's last name

In [150]:
sql_film_rentals = """
SELECT f.title AS 'Film Title',
    s.last_name AS 'Staff Last Name',
    FORMAT(COUNT(r.rental_id),0) AS 'Total Rentals'
FROM sakila_dw.fact_rentals AS r
JOIN sakila_dw.dim_films AS f
ON r.film_key = f.film_key
JOIN sakila_dw.dim_staff AS s
ON r.staff_key = s.staff_key
GROUP BY f.title, s.last_name
ORDER BY `Total Rentals` DESC;
"""

In [152]:
df_film_rentals = get_sql_dataframe(sql_film_rentals, **mysql_args)
df_film_rentals

Unnamed: 0,Film Title,Staff Last Name,Total Rentals
0,INDEPENDENCE HOTEL,Stephens,9
1,MURDER ANTITRUST,Stephens,9
2,USUAL UNTOUCHABLES,Hillyer,9
3,JACKET FRISCO,Stephens,9
4,POLLOCK DELIVERANCE,Stephens,9
...,...,...,...
1907,CASPER DRAGONFLY,Stephens,1
1908,HARDLY ROBBERS,Hillyer,1
1909,PHANTOM GLORY,Stephens,1
1910,BUNCH MINDS,Stephens,1
