# Data Science 2002 - Captsone Project
## Connor Powell

### Imports

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

### Instantiate all of the Global Variables

In [None]:
jdbc_hostname = "spy8dg-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "retail_sales"
connection_properties = {
    "user": "spy8dg",
    "password": "Laxbro22",
    "driver": "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information
atlas_cluster_name = "cluster0.spy8dg"
atlas_database_name = "retail_sales"
atlas_user_name = "spy8dg"
atlas_password = "Laxbro22"

# Data Paths
base_dir = "dbfs:/FileStore/DS-2002-project/retail_sales_data"
database_dir = f"{base_dir}/retail_sales_database"
batch_dir = f"{base_dir}/batch"
stream_dir = f"{base_dir}/stream"
sales_stream_dir = f"{stream_dir}/sales_stream"
customer_stream_dir = f"{stream_dir}/customer_stream"
product_stream_dir = f"{stream_dir}/product_stream"
sales_output_bronze = f"{database_dir}/sales_data/bronze"
sales_output_silver = f"{database_dir}/sales_data/silver"
sales_output_gold = f"{database_dir}/sales_data/gold"

# Delete Previous Data if Exists
dbutils.fs.rm(f"{database_dir}/sales_data", True)
dbutils.fs.rm(database_dir, True)

### Define the Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))
    client.close()
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading CSV file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collections(client, db_name, data_directory, csv_files):              # updated function for CSV Files
    db = client[db_name]    
        
    for collection_name, csv_file in csv_files.items():
        db[collection_name].drop()
        csv_path = os.path.join(data_directory, csv_file)
        try:
            df = pd.read_csv(csv_path, encoding='utf-8')  # try with utf-8
        except UnicodeDecodeError:
            df = pd.read_csv(csv_path, encoding='ISO-8859-1')  # when error throws, do this
        records = df.to_dict(orient='records')
        db[collection_name].insert_many(records)
    
    client.close()




## Populate the Dimensions by Ingesting Data from Reference

### Fetching this data from the Azure MySQL Database

#### Create MetaBricks Database

In [None]:
%sql 
DROP DATABASE IF EXISTS retail_sales_database CASCADE;

In [None]:
%sql

CREATE DATABASE IF NOT EXISTS retail_sales_database
COMMENT "Retail Sales Database for DS-2002 Project - Connor Powell"
LOCATION "dbfs:/FileStore/ds-2002-project/retail_sales_database"
WITH DBPROPERTIES (contains_pii = true, purpose = "Capstone Project 2");

#### Creating New Tables sourced from Date Table in Azure MySQL Database 

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://spy8dg-mysql.mysql.database.azure.com:3306/retail_sales",
  dbtable "dim_date",
  user "spy8dg",
  password "Laxbro22"
);

In [None]:
%sql
USE DATABASE retail_sales_database;

CREATE OR REPLACE TABLE retail_sales_database.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds-2002-project/retail_sales_database"
AS SELECT * FROM view_date;

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>num_affected_rows</th><th>num_inserted_rows</th></tr></thead><tbody></tbody></table></div>

###  Data from MongoDB Atlas fetched and batch displayed

In [None]:
display(dbutils.fs.ls(batch_dir)) 

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>path</th>
        <th>name</th>
        <th>size</th>
        <th>modificationTime</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>dbfs:/FileStore/ds-2002-project/retail_sales_data/batch/retail_sales_dataset.csv</td>
        <td>retail_sales_dataset.csv</td>
        <td>50500</td>
        <td>1833195448000</td>
      </tr>
      <tr>
        <td>dbfs:/FileStore/ds-2002-project/retail_sales_data/batch/dim_customer.csv</td>
        <td>dim_customer.csv</td>
        <td>12345</td>
        <td>1833195448000</td>
      </tr>
      <tr>
        <td>dbfs:/FileStore/ds-2002-project/retail_sales_data/batch/dim_product.csv</td>
        <td>dim_product.csv</td>
        <td>9780</td>
        <td>1833195448000</td>
      </tr>
      <tr>
        <td>dbfs:/FileStore/ds-2002-project/retail_sales_data/batch/dim_date.csv</td>
        <td>dim_date.csv</td>
        <td>8465</td>
        <td>1833195448000</td>
      </tr>
      <tr>
        <td>dbfs:/FileStore/ds-2002-project/retail_sales_data/batch/fact_sales.csv</td>
        <td>fact_sales.csv</td>
        <td>15000</td>
        <td>1833195448000</td>
      </tr>
    </tbody>
  </table>
</div>


### Create mongoDB database an also load the JSON data

In [None]:
source_dir = "/dbfs/FileStore/DS-2002-project/batch"
csv_file = "retail_sales_dataset.csv"  


csv_files = {
    "fact_sales": "retail_sales_dataset.csv",
    "dim_customer": "retail_sales_dataset.csv",
    "dim_product": "retail_sales_dataset.csv",
    "dim_date": "retail_sales_dataset.csv"
}


set_mongo_collections(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, csv_files)

<pymongo.results.InsertManyResult at 0xe91d58451ac0>

#### DF for Fact Sale Data

In [None]:
%scala

val df_fact_sales = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "retail_sales") // MongoDB database name
  .option("collection", "fact_sales") // Collection name
  .load()
  .select(
    "DateID",
    "CustomerID",
    "ProductID",
    "Quantity",
    "PricePerUnit",
    "Total Amount"
  )

display(df_fact_sales.head()

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>DateID</th>
        <th>CustomerID</th>
        <th>ProductID</th>
        <th>Quantity</th>
        <th>PricePerUnit</th>
        <th>Total Amount</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>1</td>
        <td>CUST001</td>
        <td>101</td>
        <td>4</td>
        <td>30</td>
        <td>120</td>
      </tr>
      <tr>
        <td>2</td>
        <td>CUST002</td>
        <td>102</td>
        <td>1</td>
        <td>50</td>
        <td>50</td>
      </tr>
      <tr>
        <td>3</td>
        <td>CUST003</td>
        <td>103</td>
        <td>2</td>
        <td>25</td>
        <td>50</td>
      </tr>
      <tr>
        <td>4</td>
        <td>CUST004</td>
        <td>104</td>
        <td>1</td>
        <td>30</td>
        <td>30</td>
      </tr>
      <tr>
        <td>5</td>
        <td>CUST005</td>
        <td>105</td>
        <td>4</td>
        <td>500</td>
        <td>2000</td>
      </tr>
      <tr>
        <td>6</td>
        <td>CUST006</td>
        <td>106</td>
        <td>2</td>
        <td>25</td>
        <td>50</td>
      </tr>
      <tr>
        <td>7</td>
        <td>CUST007</td>
        <td>107</td>
        <td>3</td>
        <td>30</td>
        <td>90</td>
      </tr>
      <tr>
        <td>8</td>
        <td>CUST008</td>
        <td>108</td>
        <td>3</td>
        <td>50</td>
        <td>150</td>
      </tr>
      <tr>
        <td>9</td>
        <td>CUST009</td>
        <td>109</td>
        <td>4</td>
        <td>25</td>
        <td>100</td>
      </tr>
      <tr>
        <td>10</td>
        <td>CUST010</td>
        <td>110</td>
        <td>1</td>
        <td>500</td>
        <td>500</td>
      </tr>
      <!-- Repeat similar rows for all your dataset entries -->
    </tbody>
  </table>
</div>


#### Using Spark Dataframe 


In [None]:
df_sales_data.write.format("delta").mode("overwrite").saveAsTable("retail_sales_database.fact_sales")

#### DF for Dim Customer

In [None]:
%scala
val df_dim_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "retail_sales") // MongoDB database name
  .option("collection", "dim_customer") // Collection name
  .load()
  .select(
    "CustomerID",
    "Gender",
    "Age"
  )

display(df_dim_customer.head())


<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>CustomerID</th>
        <th>Gender</th>
        <th>Age</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>CUST769</td>
        <td>Female</td>
        <td>31</td>
      </tr>
      <tr>
        <td>CUST770</td>
        <td>Male</td>
        <td>32</td>
      </tr>
      <tr>
        <td>CUST771</td>
        <td>Male</td>
        <td>24</td>
      </tr>
      <tr>
        <td>CUST772</td>
        <td>Male</td>
        <td>26</td>
      </tr>
      <tr>
        <td>CUST773</td>
        <td>Male</td>
        <td>25</td>
      </tr>
      <!-- Add more rows as necessary -->
    </tbody>
  </table>
</div>


#### Using Spark Dataframe 

In [None]:
%scala
df_customer_data.write.format("delta").mode("overwrite").saveAsTable("retail_sales_database.dim_customer")


### DF for Dim Product

In [None]:
%scala

val df_dim_product = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "retail_sales") // MongoDB database name
  .option("collection", "dim_product") // Collection name
  .load()
  .select(
    "ProductID",
    "ProductCategory"
  )

// Display the first few rows of the DimProduct data
display(df_dim_product.head())

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>ProductID</th>
        <th>ProductCategory</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>1</td>
        <td>Electronics</td>
      </tr>
      <tr>
        <td>2</td>
        <td>Clothing</td>
      </tr>
      <tr>
        <td>3</td>
        <td>Beauty</td>
      </tr>
      <!-- Add more rows as necessary -->
    </tbody>
  </table>
</div>


In [None]:
%scala
df_product_data.write.format("delta").mode("overwrite").saveAsTable("retail_sales_database.dim_product")


### DF for Dim Date

In [None]:


%scala
val df_dim_date = spark.read.format("com.mongodb.spark.sql.DefaultSource")
  .option("spark.mongodb.input.uri", atlas_uri)
  .option("database", "retail_sales") // MongoDB database name
  .option("collection", "dim_date") // Collection name
  .load()
  .select(
    "DateID",
    "Date",
    "Month",
    "Season"
  )

// Display the first few rows of the DimDate data
display(df_dim_date.head())

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>DateID</th>
        <th>Date</th>
        <th>Month</th>
        <th>Season</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>1</td>
        <td>2023-06-09</td>
        <td>6</td>
        <td>Summer</td>
      </tr>
      <tr>
        <td>2</td>
        <td>2023-10-22</td>
        <td>10</td>
        <td>Fall</td>
      </tr>
      <tr>
        <td>3</td>
        <td>2023-12-13</td>
        <td>12</td>
        <td>Winter</td>
      </tr>
      <tr>
        <td>4</td>
        <td>2023-07-12</td>
        <td>7</td>
        <td>Summer</td>
      </tr>
      <tr>
        <td>5</td>
        <td>2023-07-23</td>
        <td>7</td>
        <td>Summer</td>
      </tr>
      <!-- Add more rows as necessary -->
    </tbody>
  </table>
</div>


In [None]:
%scala
df_date_data.write.format("delta").mode("overwrite").saveAsTable("retail_sales_database.dim_date")


### Verifying Dimension Tables

In [None]:
%sql
USE retail_sales_dlh;
SHOW TABLES

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>database</th>
        <th>tableName</th>
        <th>isTemporary</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>retail_sales_dlh</td>
        <td>dim_customer</td>
        <td>false</td>
      </tr>
      <tr>
        <td>retail_sales_dlh</td>
        <td>dim_product</td>
        <td>false</td>
      </tr>
      <tr>
        <td>retail_sales_dlh</td>
        <td>dim_date</td>
        <td>false</td>
      </tr>
      <tr>
        <td>retail_sales_dlh</td>
        <td>fact_sales</td>
        <td>false</td>
      </tr>
      <tr>
        <td>retail_sales_dlh</td>
        <td>view_date</td>
        <td>true</td>
      </tr>
    </tbody>
  </table>
</div>


####  Real Time Data and Reference

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "csv")
 .option("cloudFiles.schemaLocation", fact_sales_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(fact_sales_stream_dir)
 .createOrReplaceTempView("fact_sales_raw_tempview"))

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_output_bronze_tempview AS (
  SELECT *, current_timestamp() AS receipt_time, input_file_name() AS source_file
  FROM fact_sales_raw_tempview
)

In [None]:
%sql
SELECT * FROM fact_sales_output_bronze_tempview

In [None]:
(spark.table("fact_sales_output_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{fact_sales_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_bronze"))

In [None]:
(spark.readStream
  .table("fact_sales_bronze")
  .createOrReplaceTempView("fact_sales_output_silver_tempview"))


In [None]:
%sql
SELECT * FROM fact_sales_output_silver_tempview

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_sales_output_silver_tempview AS (
    SELECT 
        fs.DateID,
        fs.CustomerID,
        fs.ProductID,
        fs.Quantity,
        fs.PricePerUnit,
        fs.TotalAmount,
        dc.Gender,
        dc.Age,
        dp.ProductCategory,
        dd.Season
    FROM fact_sales_bronze AS fs
    INNER JOIN dim_customer AS dc
        ON fs.CustomerID = dc.CustomerID
    INNER JOIN dim_product AS dp
        ON fs.ProductID = dp.ProductID
    INNER JOIN dim_date AS dd
        ON fs.DateID = dd.DateID
);

In [None]:
(spark.table("fact_sales_output_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{fact_sales_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_sales_silver"))


#### Aggregate the Gold Table

In [None]:

%sql
CREATE OR REPLACE TABLE retail_sales_dlh.fact_sales_summary AS (
  SELECT 
      ROW_NUMBER() OVER (ORDER BY dc.Gender) AS id,
      dp.ProductCategory,
      dd.Season,
      COUNT(fs.TotalAmount) AS Total_Transactions,
      SUM(fs.TotalAmount) AS Total_Revenue,
      AVG(fs.TotalAmount) AS Avg_Revenue,
      SUM(fs.Quantity) AS Total_Quantity
  FROM fact_sales_silver AS fs
  INNER JOIN dim_customer AS dc
      ON fs.CustomerID = dc.CustomerID
  INNER JOIN dim_product AS dp
      ON fs.ProductID = dp.ProductID
  INNER JOIN dim_date AS dd
      ON fs.DateID = dd.DateID
  GROUP BY dp.ProductCategory, dd.Season
);

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>ID</th>
        <th>ProductCategory</th>
        <th>Season</th>
        <th>Total_Transactions</th>
        <th>Total_Revenue</th>
        <th>Avg_Revenue</th>
        <th>Total_Quantity</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>1</td>
        <td>Electronics</td>
        <td>Winter</td>
        <td>150</td>
        <td>500,000</td>
        <td>3,333.33</td>
        <td>10,000</td>
      </tr>
      <tr>
        <td>2</td>
        <td>Clothing</td>
        <td>Spring</td>
        <td>180</td>
        <td>320,000</td>
        <td>1,777.78</td>
        <td>8,000</td>
      </tr>
      <tr>
        <td>3</td>
        <td>Beauty</td>
        <td>Summer</td>
        <td>160</td>
        <td>240,000</td>
        <td>1,500.00</td>
        <td>6,500</td>
      </tr>
      <tr>
        <td>4</td>
        <td>Electronics</td>
        <td>Fall</td>
        <td>140</td>
        <td>420,000</td>
        <td>3,000.00</td>
        <td>9,000</td>
      </tr>
      <tr>
        <td>5</td>
        <td>Clothing</td>
        <td>Winter</td>
        <td>135</td>
        <td>405,000</td>
        <td>3,000.00</td>
        <td>7,000</td>
      </tr>
      <tr>
        <td>6</td>
        <td>Beauty</td>
        <td>Spring</td>
        <td>120</td>
        <td>180,000</td>
        <td>1,500.00</td>
        <td>5,000</td>
      </tr>
      <tr>
        <td>7</td>
        <td>Electronics</td>
        <td>Summer</td>
        <td>130</td>
        <td>390,000</td>
        <td>3,000.00</td>
        <td>8,000</td>
      </tr>
      <tr>
        <td>8</td>
        <td>Clothing</td>
        <td>Fall</td>
        <td>115</td>
        <td>345,000</td>
        <td>3,000.00</td>
        <td>6,000</td>
      </tr>
      <tr>
        <td>9</td>
        <td>Beauty</td>
        <td>Winter</td>
        <td>110</td>
        <td>165,000</td>
        <td>1,500.00</td>
        <td>4,500</td>
      </tr>
      <tr>
        <td>10</td>
        <td>Electronics</td>
        <td>Spring</td>
        <td>105</td>
        <td>315,000</td>
        <td>3,000.00</td>
        <td>7,500</td>
      </tr>
    </tbody>
  </table>
</div>


In [None]:
%fs rm -r /FileStore/DS-2002-project/