**Project 1 ETL**

Import Libraries

In [3]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

Connection Variables, Strings, Functions

In [4]:
mysql_args = {
    "uid" : "ds2002",
    "pwd" : "ds2002",
    "hostname" : "localhost",
    "dbname" : "classic_dw"
}

mongodb_args = {
    "user_name" : "ds2002",
    "password" : "ds2002",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "ba2iiiz",
    "cluster_location" : "atlas", # "local"
    "db_name" : "classicmodels"
}

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

    
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    
# CREATE DATA WAREHOUSE    
  
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

connection = sqlEngine.connect()
connection.execute(f"DROP DATABASE IF EXISTS `{mysql_args['dbname']}`;")
connection.execute(f"CREATE DATABASE `{mysql_args['dbname']}`;")
connection.execute(f"USE {mysql_args['dbname']};")

connection.close()

# POPULATE MONGODB

client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {'orders':'classic_orders.json',
              'order_details':'classic_ods.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

**Extract from SQL**

In [5]:
# Customers
sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [22]:
# Employees
sql_employees = "SELECT * FROM classicmodels.employees"
df_employees = get_sql_dataframe(sql_employees,**mysql_args)
df_employees.head(2)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [7]:
# Payments
sql_payments = "SELECT * FROM classicmodels.payments"
df_payments = get_sql_dataframe(sql_payments, **mysql_args)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,103,HQ336336,2004-10-19,6066.78
1,103,JM555205,2003-06-05,14571.44


In [8]:
# Products
sql_products = "SELECT * FROM classicmodels.products"
df_products = get_sql_dataframe(sql_products, **mysql_args)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


**Extract from MongoDB**

In [9]:
# Order Details
client = get_mongo_client(**mongodb_args)
collection = 'order_details'
df_ods = get_mongo_dataframe(client, mongodb_args['db_name'],collection,{})
df_ods.head(2)

Unnamed: 0,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2


In [10]:
# Orders
client = get_mongo_client(**mongodb_args)
collection = 'orders'
df_orders = get_mongo_dataframe(client, mongodb_args['db_name'],collection,{})
df_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


**Extract from CSV**

In [11]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'classic_plines.csv')

df_plines = pd.read_csv(data_file, header=0, index_col=0)
df_plines.head(2)

Unnamed: 0_level_0,textDescription,htmlDescription,image
productLine,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Classic Cars,Attention car enthusiasts: Make your wildest c...,,
Motorcycles,Our motorcycles are state of the art replicas ...,,


Date Dimmension

In [12]:
sql_dim_date = "SELECT date_key, full_date FROM classic_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [13]:
# Order Date
df_orders.rename(columns={'orderDate':'order_date'},inplace=True)
df_dim_order_date = df_dim_date.rename(columns={'date_key':'order_date_key','full_date':'order_date'})
df_orders.order_date = df_orders.order_date.astype('datetime64[ns]').dt.date
df_orders = pd.merge(df_orders, df_dim_order_date, on='order_date',how='left')
df_orders.drop(['order_date'],axis=1,inplace=True)
df_orders.head(2)

# Required Date
df_orders.rename(columns={'requiredDate':'required_date'},inplace=True)
df_dim_req_date = df_dim_date.rename(columns={'date_key':'required_date_key','full_date':'required_date'})
df_orders.required_date = df_orders.required_date.astype('datetime64[ns]').dt.date
df_orders = pd.merge(df_orders, df_dim_req_date, on='required_date',how='left')
df_orders.drop(['required_date'],axis=1,inplace=True)
df_orders.head(2)

# Shipped Date
df_orders.rename(columns={'shippedDate':'shipped_date'},inplace=True)
df_dim_ship_date = df_dim_date.rename(columns={'date_key':'shipped_date_key','full_date':'shipped_date'})
df_orders.shipped_date = df_orders.shipped_date.astype('datetime64[ns]').dt.date
df_orders = pd.merge(df_orders, df_dim_ship_date, on='shipped_date', how='left')
df_orders.drop(['shipped_date'],axis=1,inplace=True)
df_orders.head(2)

Unnamed: 0,orderNumber,status,comments,customerNumber,order_date_key,required_date_key,shipped_date_key
0,10100,Shipped,,363,20030106,20030113,20030110.0
1,10101,Shipped,Check on availability.,128,20030109,20030118,20030111.0


In [14]:
# Payment Date
df_payments.rename(columns={'paymentDate':'payment_date'},inplace=True)
df_dim_pay_date = df_dim_date.rename(columns={'date_key':'payment_date_key','full_date':'payment_date'})
df_payments.payment_date = df_payments.payment_date.astype('datetime64[ns]').dt.date
df_payments = pd.merge(df_payments, df_dim_pay_date, on='payment_date',how='left')
df_payments.drop(['payment_date'],axis=1,inplace=True)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,amount,payment_date_key
0,103,HQ336336,6066.78,20041019
1,103,JM555205,14571.44,20030605


**Transformation**

In [15]:
# Customers Cleaning
df_customers.drop(['addressLine2'],axis=1,inplace=True)
df_customers.rename(columns={'customerNumber':'customer_id','customerName':'company','contactFirstName':'first_name',
                             'addressLine1':'address','postalCode':'postal_code','salesRepEmployeeNumber':'employee_id',
                            'creditLimit':'credit_limit', 'contactLastName':'last_name'},inplace=True)
df_customers.insert(0,'customer_key',range(1,df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,company,last_name,first_name,phone,address,city,state,postal_code,country,employee_id,credit_limit
0,1,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",Nantes,,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800.0


In [23]:
# Employees Cleaning
df_employees.rename(columns={'employeeNumber':'employee_id','lastName':'last_name','firstName':'first_name',
                             'officeCode':'office_code','reportsTo':'reports_to','jobTitle':'job_title'},inplace=True)
df_employees.drop(['office_code'],axis=1,inplace=True)
df_employees.insert(0, 'employee_key', range(1, df_employees.shape[0]+1))
df_employees.head(2)

Unnamed: 0,employee_key,employee_id,last_name,first_name,extension,email,reports_to,job_title
0,1,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,,President
1,2,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1002.0,VP Sales


In [15]:
# Payments Cleaning
df_payments.rename(columns={'customerNumber':'customer_id','checkNumber':'check_num'},inplace=True)
df_payments.insert(0, 'payment_key', range(1, df_payments.shape[0]+1))
df_payments.head(2)

Unnamed: 0,payment_key,customer_id,check_num,amount,payment_date_key
0,1,103,HQ336336,6066.78,20041019
1,2,103,JM555205,14571.44,20030605


In [16]:
# Products Cleaning
df_products.rename(columns={'productCode':'product_id','productName':'product','productLine':'product_line',
                    'productVendor':'vendor','productDescription':'description','quantityInStock':'in_stock',
                    'buyPrice':'buy_price'},inplace=True)
df_products.drop(['productScale','description'],axis=1,inplace=True)
df_products.insert(0, 'product_key', range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,product_id,product,product_line,vendor,in_stock,buy_price,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3


In [44]:
# Order Details Cleaning
df_ods.rename(columns={'orderNumber':'order_id','productCode':'product_id','quantityOrdered':'order_quantity',
                       'priceEach':'unit_price'},inplace=True)
df_ods.drop(['orderLineNumber'],axis=1,inplace=True)
df_ods.insert(0, 'ods_key', range(1, df_ods.shape[0]+1))
df_ods.head(2)

Unnamed: 0,ods_key,order_id,product_id,order_quantity,unit_price
0,1,10100,S18_1749,30,136.0
1,2,10100,S18_2248,50,55.09


In [19]:
# Orders Cleaning
df_orders.rename(columns={'orderNumber':'order_id','customerNumber':'customer_id'},inplace=True)
df_orders.drop(['comments'],axis=1,inplace=True)
df_orders.insert(0, 'order_key', range(1, df_orders.shape[0]+1))
df_orders.head(2)

Unnamed: 0,order_key,order_id,status,customer_id,order_date_key,required_date_key,shipped_date_key
0,1,10100,Shipped,363,20030106,20030113,20030110.0
1,2,10101,Shipped,128,20030109,20030118,20030111.0


In [20]:
# Product Lines Cleaning
df_plines.head()
df_plines.insert(0, 'product_line_key', range(1, df_plines.shape[0]+1))
df_plines.reset_index(inplace=True)
df_plines.set_index('product_line_key')
df_plines.rename(columns={'productLine':'product_line'},inplace=True)
df_plines.drop(['textDescription','htmlDescription','image'],axis=1,inplace=True)
df_plines.head(2)

Unnamed: 0,product_line,product_line_key
0,Classic Cars,1
1,Motorcycles,2


**Load Cleaned Dimmensions**

In [21]:
tables = [('dim_customers',df_customers,'customer_key'),
          ('dim_employees',df_employees,'employee_key'),
          ('dim_ods',df_ods,'ods_key'),
          ('dim_orders',df_orders,'order_key'),
          ('dim_payments',df_payments,'payment_key'),
          ('dim_product_lines',df_plines,'product_line_key'),
          ('dim_products',df_products,'product_key')
]
for table_name, df, pk in tables:
    set_dataframe(df, table_name, pk, 'insert', **mysql_args)

**Create Fact Table**

Fetch Keys

In [22]:
sql_customer = "SELECT customer_key, customer_id, employee_id FROM classic_dw.dim_customers"
df_dim_customers = get_sql_dataframe(sql_customer,**mysql_args)
df_dim_customers.head()

Unnamed: 0,customer_key,customer_id,employee_id
0,1,103,1370.0
1,2,112,1166.0
2,3,114,1611.0
3,4,119,1370.0
4,5,121,1504.0


In [29]:
sql_employee = "SELECT employee_key, employee_id FROM classic_dw.dim_employees"
df_dim_employees = get_sql_dataframe(sql_employee, **mysql_args)
df_dim_employees.head()

Unnamed: 0,employee_key,employee_id
0,1,1002
1,2,1056
2,3,1076
3,4,1088
4,5,1102


In [23]:
sql_products = "SELECT product_key, product_id FROM classic_dw.dim_products"
df_dim_products = get_sql_dataframe(sql_products, **mysql_args)
df_dim_products.head()

Unnamed: 0,product_key,product_id
0,1,S10_1678
1,2,S10_1949
2,3,S10_2016
3,4,S10_4698
4,5,S10_4757


In [24]:
sql_product_lines = "SELECT product_line_key FROM classic_dw.dim_product_lines"
df_dim_plines = get_sql_dataframe(sql_product_lines, **mysql_args)
df_dim_plines.head()

Unnamed: 0,product_line_key
0,1
1,2
2,3
3,4
4,5


In [25]:
sql_orders = "SELECT order_key, order_id, customer_id FROM classic_dw.dim_orders"
df_dim_orders = get_sql_dataframe(sql_orders, **mysql_args)
df_dim_orders.head()

Unnamed: 0,order_key,order_id,customer_id
0,1,10100,363
1,2,10101,128
2,3,10102,181
3,4,10103,121
4,5,10104,141


In [26]:
sql_ods = "SELECT ods_key, order_id, product_id FROM classic_dw.dim_ods"
df_dim_ods = get_sql_dataframe(sql_ods, **mysql_args)
df_dim_ods.head()

Unnamed: 0,ods_key,order_id,product_id
0,1,10100,S18_1749
1,2,10100,S18_2248
2,3,10100,S18_4409
3,4,10100,S24_3969
4,5,10101,S18_2325


In [27]:
sql_p = "SELECT payment_key, customer_id FROM classic_dw.dim_payments"
df_dim_payments = get_sql_dataframe(sql_p, **mysql_args)
df_dim_payments.head()

Unnamed: 0,payment_key,customer_id
0,1,103
1,2,103
2,3,103
3,4,112
4,5,112


Orders Table

In [30]:
df_fact_pos = pd.merge(df_dim_orders, df_dim_ods, on='order_id',how='left')
df_fact_pos.head()

#Customers
df_fact_pos = pd.merge(df_fact_pos, df_dim_customers, on='customer_id', how='left')
df_fact_pos.head()

#Employees
df_fact_pos = pd.merge(df_fact_pos, df_dim_employees, on='employee_id', how='left')
df_fact_pos.head()

#Products
df_fact_pos = pd.merge(df_fact_pos, df_dim_products, on='product_id', how='left')
df_fact_pos.head()


#Cleaning
df_fact_pos.drop(['order_id','customer_id','product_id','employee_id'],axis=1,inplace=True)
df_fact_pos.insert(0, 'fact_order_key', range(1,df_fact_pos.shape[0]+1))
ordered_cols = ['fact_order_key','order_key','customer_key','ods_key','employee_key','product_key']
df_fact_pos = df_fact_pos[ordered_cols]
df_fact_pos.rename(columns={'employee_key':'sales_rep_key'},inplace=True)
df_fact_pos.head()

Unnamed: 0,fact_order_key,order_key,customer_key,ods_key,sales_rep_key,product_key
0,1,1,86,1.0,10,23.0
1,2,1,86,2.0,10,27.0
2,3,1,86,3.0,10,50.0
3,4,1,86,4.0,10,80.0
4,5,2,8,5.0,17,29.0


Payments Table

In [31]:
df_fact_transaction = pd.merge(df_dim_payments, df_dim_customers, on='customer_id',how='left')
df_fact_transaction = pd.merge(df_fact_transaction, df_dim_employees, on='employee_id',how='left')
df_fact_transaction.drop(['customer_id','employee_id'],axis=1,inplace=True)
ordered_cols = ['payment_key','customer_key','employee_key']
df_fact_transaction = df_fact_transaction[ordered_cols]
df_fact_transaction.rename(columns={'employee_key':'sales_rep_key'},inplace=True)
df_fact_transaction.insert(0, 'fact_payment_key', range(1, df_fact_transaction.shape[0]+1))
df_fact_transaction.head()

Unnamed: 0,fact_payment_key,payment_key,customer_key,sales_rep_key
0,1,1,1,14
1,2,2,1,14
2,3,3,1,14
3,4,4,2,8
4,5,5,2,8


Load to SQL + Verify Functionality with Select Statements

In [35]:
tables = [(df_fact_pos,'fact_purchase_orders','fact_order_key','insert'),
         (df_fact_transaction,'fact_inventory_transactions','fact_payment_key','insert')
    ]
for df, table, pk, db in tables:
    set_dataframe(df, table, pk, db, **mysql_args)
dfpo = get_sql_dataframe("SELECT * FROM classic_dw.fact_purchase_orders",**mysql_args)
dfpo

Unnamed: 0,fact_order_key,order_key,customer_key,ods_key,sales_rep_key,product_key
0,1,1,86,1.0,10,23.0
1,2,1,86,2.0,10,27.0
2,3,1,86,3.0,10,50.0
3,4,1,86,4.0,10,80.0
4,5,2,8,5.0,17,29.0
...,...,...,...,...,...,...
1215,1216,322,6,,7,
1216,1217,323,17,,10,
1217,1218,324,66,,15,
1218,1219,325,11,,14,


In [38]:
dfit = get_sql_dataframe("SELECT * FROM classic_dw.fact_inventory_transactions",**mysql_args)
dfit

Unnamed: 0,fact_payment_key,payment_key,customer_key,sales_rep_key
0,1,1,1,14
1,2,2,1,14
2,3,3,1,14
3,4,4,2,8
4,5,5,2,8
...,...,...,...,...
268,269,269,121,9
269,270,270,121,9
270,271,271,122,19
271,272,272,122,19


In [39]:
sql_1 = """
    SELECT c.company AS 'Customer',
    SUM(ods.order_quantity) AS 'Total Quantity',
    SUM(ods.unit_price) AS 'Total Unit Price'
    FROM classic_dw.fact_purchase_orders as po
    INNER JOIN classic_dw.dim_customers AS c
    ON po.customer_key = c.customer_key
    INNER JOIN classic_dw.dim_ods AS ods
    ON po.ods_key = ods.ods_key
    GROUP BY c.company
    ORDER BY 'Total Quantity', 'Total Unit Price'
"""
df_1 = get_sql_dataframe(sql_1, **mysql_args)
df_1

Unnamed: 0,Customer,Total Quantity,Total Unit Price
0,Online Diecast Creations Co.,736.0,1842.92
1,"Blauer See Auto, Co.",142.0,352.00
2,Vitachrome Inc.,80.0,138.68
3,Baane Mini Imports,563.0,1588.16
4,Euro+ Shopping Channel,2153.0,5367.39
...,...,...,...
65,"Saveley & Henriot, Co.",396.0,1110.74
66,Mini Classics,385.0,911.46
67,Super Scale Inc.,316.0,978.59
68,West Coast Collectables Co.,115.0,189.66


In [40]:
sql_2 = """
    SELECT e.last_name AS 'Sales Rep',
    SUM(p.amount) AS 'Total Sales'
    FROM classic_dw.fact_inventory_transactions AS it
    INNER JOIN classic_dw.dim_employees AS e
    ON it.sales_rep_key = e.employee_key
    INNER JOIN classic_dw.dim_payments AS p
    ON it.payment_key = p.payment_key
    GROUP BY e.last_name
    ORDER BY SUM(p.amount) DESC;
"""

df_2 = get_sql_dataframe(sql_2, **mysql_args)
df_2

Unnamed: 0,Sales Rep,Total Sales
0,Hernandez,1112003.81
1,Jennings,989906.55
2,Castillo,750201.87
3,Bott,686653.25
4,Jones,637672.65
5,Vanauf,584406.8
6,Bondur,569485.75
7,Fixter,509385.82
8,Marsh,497907.16
9,Tseng,488212.67


In [41]:
sql_3 = """
    SELECT pl.product_line AS 'Product Line',
    SUM(ods.order_quantity) AS 'Total Quantity'
    FROM classic_dw.fact_purchase_orders AS po
    INNER JOIN classic_dw.dim_products AS p
    ON po.product_key = p.product_key
    INNER JOIN classic_dw.dim_product_lines as pl
    ON p.product_line = pl.product_line
    INNER JOIN classic_dw.dim_ods AS ods
    ON po.ods_key = ods.ods_key
    GROUP BY pl.product_line
    ORDER BY SUM(ods.order_quantity) DESC;
"""

df_3 = get_sql_dataframe(sql_3, **mysql_args)
df_3

Unnamed: 0,Product Line,Total Quantity
0,Classic Cars,11638.0
1,Vintage Cars,7655.0
2,Motorcycles,4031.0
3,Planes,3833.0
4,Trucks and Buses,3647.0
5,Ships,2844.0
6,Trains,955.0
