Importing proper libraries.

In [1]:
import findspark
findspark.init()

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W
from pyspark.sql.functions import col, count, sum as _sum

Initializing Global Variables, Including MYSQL, MONGODB, DIRECTORY FOR SPARK

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventure_dw2",
    "conn_props" : {
        "user" : "root",
        "password" : "WreckedRatUVA123!",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}

# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "user_name" : "dev",
    "password" : "abc123!",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "zyvokl5",
    "cluster_location" : "atlas", # "local"
    "collection" : "AdventureWorks",
    "null_column_threshold" : 0.5,
    "db_name" : "AdventureWorks"
}
    

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.getcwd()
batch_dir = os.path.join(base_dir, 'batch')
stream_dir = os.path.join(base_dir, 'streaming')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "adventureworks_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

sales_output_bronze = os.path.join(database_dir, 'fact_sales', 'bronze')
sales_output_silver = os.path.join(database_dir, 'fact_sales', 'silver')
sales_output_gold = os.path.join(database_dir, 'fact_sales', 'gold')


Global Functions to help simiplify process

In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [4]:
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\devpp\\Desktop\\project1\\spark-warehouse\\adventureworks_dlh.db' has been removed successfully."

Creating a new Spark Session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

Create a New Metadata Database.

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Lab 06 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Lab 6.0');
"""
spark.sql(sql_create_db)

DataFrame[]

#### Lets populate the Dimensions now. 

Lets see If the dim_customer is in the batch directory. That we did in Project 1.

In [7]:
get_file_info(batch_dir)

Unnamed: 0,name,size,modification_time
0,dim_customer.csv,1438010,2025-12-19 15:33:13.960261106


Lets populate the dim_customer_table and save it as dim_customers in the Data Lakehouse

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

customers_csv = os.path.join(batch_dir, 'dim_customer.csv')
print(customers_csv)
df_dim_customers = spark.read.format('csv').options(header='true', inferSchema='true').load(customers_csv)
window_spec = Window.orderBy("CustomerID")
df_dim_customers = df_dim_customers.withColumn("customer_key", F.row_number().over(window_spec))
df_dim_customers.toPandas().head(2)
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

C:\Users\devpp\Desktop\project1\batch\dim_customer.csv


Lets test the table to make sure it displays properly

In [9]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          CustomerID|                 int|   NULL|
|       AccountNumber|              string|   NULL|
|        CustomerType|              string|   NULL|
| SalesTerritoryGroup|              string|   NULL|
|      SalesTerritory|              string|   NULL|
|        AddressLine1|              string|   NULL|
|                City|              string|   NULL|
|          PostalCode|              string|   NULL|
|        customer_key|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_customers|       |
|        Created Time|Fri Dec 19 17:19:...|       |
|         Last Access|             UNKNOWN|       |
|          C

Unnamed: 0,CustomerID,AccountNumber,CustomerType,SalesTerritoryGroup,SalesTerritory,AddressLine1,City,PostalCode,customer_key
0,1,AW00000001,S,North America,Northwest,2251 Elliot Avenue,Seattle,98104,1
1,2,AW00000002,S,North America,Northwest,7943 Walnut Ave,Renton,98055,2


Lets insert the product.json file into Mongo Atlas

In [10]:
client = get_mongo_client(**mongodb_args)

json_files = {"Products" : "dim_product.json"}

set_mongo_collections(client, mongodb_args["db_name"], os.getcwd(), json_files) 
print("done")

done


In [11]:
mongodb_args["collection"] = "Products"
df_dim_Products = get_mongodb_dataframe(spark, **mongodb_args)
df_dim_Products.toPandas().head(2)
window_spec = Window.orderBy("ProductID")
df_dim_Products = df_dim_Products.withColumn("product_key", F.row_number().over(window_spec))

Lets reorder the table

In [12]:
df_dim_Products = df_dim_Products[[
    'product_key',
    'ProductID',
    'Name',
    'ProductNumber',
    'MakeFlag',
    'FinishedGoodsFlag',
    'SafetyStockLevel',
    'ReorderPoint',
    'DaysToManufacture',
    'SellStartDate',
]]
df_dim_Products.toPandas().head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,SafetyStockLevel,ReorderPoint,DaysToManufacture,SellStartDate
0,1,1,Adjustable Race,AR-5381,b'\x00',b'\x00',1000,750,0,1998-06-01
1,2,2,Bearing Ball,BA-8327,b'\x00',b'\x00',1000,750,0,1998-06-01


Lets save the dim_Products table

In [13]:
df_dim_Products.write.saveAsTable(f"{dest_database}.dim_Products", mode="overwrite")
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_Products;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_Products LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         product_key|                 int|   NULL|
|           ProductID|                 int|   NULL|
|                Name|              string|   NULL|
|       ProductNumber|              string|   NULL|
|            MakeFlag|              string|   NULL|
|   FinishedGoodsFlag|              string|   NULL|
|    SafetyStockLevel|                 int|   NULL|
|        ReorderPoint|                 int|   NULL|
|   DaysToManufacture|                 int|   NULL|
|       SellStartDate|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|        dim_products|       |
|        Created Time|Fri Dec 19 17:19:...|       |
|         La

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,SafetyStockLevel,ReorderPoint,DaysToManufacture,SellStartDate
0,1,1,Adjustable Race,AR-5381,b'\x00',b'\x00',1000,750,0,1998-06-01
1,2,2,Bearing Ball,BA-8327,b'\x00',b'\x00',1000,750,0,1998-06-01


Lets populate from MySQL Database.
We need the Date Dimension Table and the Employee dimension Table.

Here is the Date Dimension Table

In [14]:
sql_dim_date = f"SELECT * FROM {mysql_args['db_name']}.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|           full_date|timestamp|   NULL|
|            date_key|   bigint|   NULL|
|           date_name|   string|   NULL|
|        date_name_us|   string|   NULL|
|        date_name_eu|   string|   NULL|
|         day_of_week|      int|   NULL|
|    day_name_of_week|   string|   NULL|
|        day_of_month|      int|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend|   string|   NULL|
|        week_of_year|   bigint|   NULL|
|          month_name|   string|   NULL|
|       month_of_year|      int|   NULL|
|is_last_day_of_month|   string|   NULL|
|    calendar_quarter|      int|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month|   string|   NULL|
|   calendar_year_qtr|   string|   NULL|
|fiscal_month_of_year|      int|   NULL|
|      fiscal_quarter|      int|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,full_date,date_key,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,2000-01-01,20000101,2000/01/01,01/01/2000,01/01/2000,6,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,2000-01-02,20000102,2000/01/02,01/02/2000,02/01/2000,7,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


Here is the Employee Dimension Table

In [15]:
sql_employee = f"SELECT * FROM {mysql_args['db_name']}.dim_employee"
df_dim_employee = get_mysql_dataframe(spark, sql_employee, **mysql_args)
window_spec = Window.orderBy("EmployeeID")
df_dim_employee = df_dim_employee.withColumn("employee_key", F.row_number().over(window_spec))
df_dim_employee.write.saveAsTable(f"{dest_database}.dim_employees", mode="overwrite")

In [16]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_employees;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_employees LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          EmployeeID|              bigint|   NULL|
|               Title|              string|   NULL|
|           BirthDate|           timestamp|   NULL|
|            HireDate|           timestamp|   NULL|
|              Gender|              string|   NULL|
|       MaritalStatus|              string|   NULL|
|      DepartmentName|              string|   NULL|
|           ManagerID|              double|   NULL|
|       VacationHours|              bigint|   NULL|
|      SickLeaveHours|              bigint|   NULL|
|        employee_key|                 int|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|  adventureworks_dlh|       |
|               Table|       dim_employees|       |
|        Cre

Unnamed: 0,EmployeeID,Title,BirthDate,HireDate,Gender,MaritalStatus,DepartmentName,ManagerID,VacationHours,SickLeaveHours,employee_key
0,1,Production Technician - WC60,1972-05-15,1996-07-31,M,M,Production,16.0,21,30,1
1,2,Marketing Assistant,1977-06-03,1997-02-26,M,S,Marketing,6.0,42,41,2


We have now gotten all of the Dimensions set up, 

We now need to create a facts table that Bronze - Silver - Gold Layer Facts Sales Table

Bronze:

In [17]:
df_sales_bronze = (
    spark.readStream \
    .option("schemaLocation", sales_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(stream_dir)
)

df_sales_bronze.isStreaming

True

Write the streaming data to a parquet file

In [18]:
sales_checkpoint_bronze = os.path.join(sales_output_bronze, '_checkpoint')

sales_bronze_query = (
    df_sales_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    .writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("sales_bronze")
    .trigger(availableNow=True)
    .option("checkpointLocation", sales_checkpoint_bronze)
    .option("compression", "snappy")
    .start(sales_output_bronze)
)

In [19]:
print(f"Query ID: {sales_bronze_query.id}")
print(f"Query Name: {sales_bronze_query.name}")
print(f"Query Status: {sales_bronze_query.status}")

Query ID: 25044b80-5f73-47fa-80fb-1992b3338c24
Query Name: sales_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [20]:
sales_bronze_query.awaitTermination()

Silver Layer

In [21]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("order_full_date"))
df_dim_shipped_date = df_dim_date.select(col("date_key").alias("shipped_date_key"), col("full_date").alias("shipped_full_date"))

In [22]:
df_sales_stream = spark.readStream.format("parquet")    .option("maxFilesPerTrigger", 5) \
.load(sales_output_bronze) 

df_sales_silver = (
    df_sales_stream
    .join(df_dim_customers, on="CustomerID")
    .join(df_dim_Products, on="ProductID")
    .join(df_dim_employee, on="EmployeeID")
    .join(df_dim_order_date, df_dim_order_date.order_full_date.cast(DateType()) == col("order_full_date").cast(DateType()), "inner") \
    .join(df_dim_shipped_date, df_dim_shipped_date.shipped_full_date.cast(DateType()) == col("shipped_full_date").cast(DateType()), "left_outer") \
    .select(
        col("salesorderid").cast(LongType()),
        df_dim_customers.customer_key.cast(LongType()), \
        df_dim_employee.employee_key.cast(LongType()), \
        df_dim_Products.product_key.cast(LongType()), \
        df_dim_order_date.order_date_key.cast(LongType()), \
        df_dim_shipped_date.shipped_date_key.cast(LongType()), \
        col("orderqty"),
        col("unitprice"),
        col("linetotal")
    )
)

In [23]:
df_sales_silver.isStreaming

True

In [24]:
df_sales_silver.printSchema()

root
 |-- salesorderid: long (nullable = true)
 |-- customer_key: long (nullable = false)
 |-- employee_key: long (nullable = false)
 |-- product_key: long (nullable = false)
 |-- order_date_key: long (nullable = true)
 |-- shipped_date_key: long (nullable = true)
 |-- orderqty: long (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- linetotal: double (nullable = true)



In [25]:
sales_checkpoint_silver = os.path.join(sales_output_silver, '_checkpoint')

sales_silver_query = (
    df_sales_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("sales_silver") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", sales_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(sales_output_silver))

print(f"Query ID: {sales_silver_query.id}")
print(f"Query Name: {sales_silver_query.name}")
print(f"Query Status: {sales_silver_query.status}")

Query ID: 232b04e6-5d09-4c29-a935-6537d3dc8d5f
Query Name: sales_silver
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [26]:
sales_silver_query.awaitTermination()
print("done")

done


Gold Layer

Lets make a table that shows how much sales each manager is responsible for.

In [34]:
df_dim_order_date = df_dim_date.select(
    col("date_key").alias("order_date_key"),
    col("calendar_year"),
    col("month_of_year"),
    col("month_name")
)

df_sales_gold = (
    df_sales_silver.alias("f")
    .join(
        df_dim_order_date.alias("d"),
        col("f.order_date_key") == col("d.order_date_key"),
        "inner"
    )
    .join(
        df_dim_employee.alias("e"),
        col("f.employee_key") == col("e.employee_key"),
        "inner"
    )
)

df_employee_sales_monthly = (
    df_sales_gold
    .groupBy(
        col("d.calendar_year"),
        col("d.month_of_year"),
        col("e.ManagerID")
    )
    .agg(
        count("f.salesorderid").alias("monthly_sales_count"),
        _sum("f.linetotal").alias("monthly_revenue")
    )
    .orderBy("calendar_year","month_of_year", "ManagerID")
)

employee_sales_gold_query = (
    df_employee_sales_monthly.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("employee_sales")
    .start()
)

IllegalArgumentException: Cannot start query with name employee_sales as a query with that name is already active in this SparkSession

In [35]:
wait_until_stream_is_ready(employee_sales_gold_query, 1)

The stream has processed 18 batchs


In [36]:
df_employee_sales = spark.sql("SELECT * FROM employee_sales")
df_employee_sales.printSchema()

root
 |-- calendar_year: integer (nullable = true)
 |-- month_of_year: integer (nullable = true)
 |-- ManagerID: double (nullable = true)
 |-- monthly_sales_count: long (nullable = false)
 |-- monthly_revenue: double (nullable = true)



In [37]:
df_employee_sales_final = df_employee_sales \
.select(
        col("ManagerID"),
        col("calendar_year").alias("year"),
        col("month_of_year").alias("month"),
        col("monthly_revenue"),
        col("monthly_sales_count")
    )

In [38]:
df_employee_sales_final.write.saveAsTable(f"{dest_database}.employee_sales_final", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.employee_sales_final").toPandas()

Unnamed: 0,ManagerID,year,month,monthly_revenue,monthly_sales_count
0,268.0,2000,1,2.522292e+08,124558
1,284.0,2000,1,6.845708e+05,124558
2,268.0,2000,2,2.359564e+08,116522
3,284.0,2000,2,6.404049e+05,116522
4,268.0,2000,3,2.522292e+08,124558
...,...,...,...,...,...
259,284.0,2007,6,6.624878e+05,120540
260,268.0,2007,7,2.522292e+08,124558
261,284.0,2007,7,6.845708e+05,124558
262,268.0,2007,8,2.522292e+08,124558


Stop The Spark Session

In [39]:
spark.stop()