In [None]:
from pyspark.sql.types import IntegerType
# importing pyspark
import pyspark
  
# importing SparkSession
from pyspark.sql import SparkSession
  
# importing all from pyspark.sql.function
from pyspark.sql.functions import *

In [None]:
###### Mount Point 1 through Oauth security.
storageAccount = "gen10datafund2205"
storageContainer = "capstone-group6-data"
clientSecret = 
clientid = 
mount_point = "/mnt/gavanvanover/capstone-group6-populating"
#20200906-20201006/Detroit911-20200906-20201006.csv
#@28077959-1e7b-474a-995f-492292e0b7f8
#Copy to clipboard

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

## Create dataframes from the MACRO and CPI data sets

In [None]:
display(dbutils.fs.ls("/mnt/gavanvanover/capstone-group6-populating"))

path,name,size,modificationTime
dbfs:/mnt/gavanvanover/capstone-group6-populating/Salestable_final.csv,Salestable_final.csv,553423,1660244005000
dbfs:/mnt/gavanvanover/capstone-group6-populating/USACPIALLMINMEI.csv,USACPIALLMINMEI.csv,20709,1659496423000
dbfs:/mnt/gavanvanover/capstone-group6-populating/US_MACRO.csv,US_MACRO.csv,627940,1659496423000
dbfs:/mnt/gavanvanover/capstone-group6-populating/retail_census_data_NAICS_NAPCS_2017.csv,retail_census_data_NAICS_NAPCS_2017.csv,675608,1659496178000


## CPI Data Set

In [None]:
cpi_df = spark.read.csv('/mnt/gavanvanover/capstone-group6-populating/USACPIALLMINMEI.csv', header = 'True')

# Create year and month columns from the date
cpi_df2 = cpi_df.withColumn('Year2', split(cpi_df['DATE'], '-').getItem(0)) \
       .withColumn('Month2', split(cpi_df['DATE'], '-').getItem(1))
# Rename column to CPI
cpi_df2 = cpi_df2.withColumnRenamed('USACPIALLMINMEI', 'CPI')
# Drop the date column
cpi_df3 = cpi_df2.drop(col('DATE'))
# Remove the leading zeroes from the month column
cpi_df3 = cpi_df3.withColumn('Month2', pyspark.sql.functions.regexp_replace('Month2', r'^[0]*', ''))


cpi_df3.dtypes

## Macroeconomics Dataset

In [None]:
macro_df = spark.read.csv('/mnt/gavanvanover/capstone-group6-populating/US_MACRO.csv', header = 'True')

# Keep only the columns we want
macro_df_cleaned = macro_df['observation_date','RPI', 'RETAIL', 'USTRADE', 'USWTRADE']
# Rename observation_date to date
macro_df_cleaned = macro_df_cleaned.withColumnRenamed('observation_date', 'DATE')
# Create columns for Year and Month from the Date Column
macro_df_cleaned = macro_df_cleaned.withColumn('Year', split(macro_df_cleaned['DATE'], '/').getItem(2)) \
       .withColumn('Month', split(macro_df_cleaned['DATE'], '/').getItem(0))
# Drop the Date Column
macro_df_cleaned = macro_df_cleaned.drop(col('DATE'))

macro_df_cleaned.dtypes

## Merge the dataframes

In [None]:
# Join the macro and CPI Data on the Month and Year
merged_df = macro_df_cleaned.join(cpi_df3, (macro_df_cleaned.Year == cpi_df3.Year2) & (macro_df_cleaned.Month == cpi_df3.Month2), 'full')
# Drop the copied columns
merged_df = merged_df.drop('Year2','Month2')
# Convert Month and Year to Integers
merged_df_cleaned = merged_df.withColumn('Month',merged_df.Month.cast(IntegerType())) \
    .withColumn('Year',merged_df.Year.cast(IntegerType()))
# Sort the data in time order
merged_and_ordered_df = merged_df_cleaned.orderBy(col('Year').asc(), col('Month').asc())
merged_and_ordered_df.show()

In [None]:
# Create a dataframe to store the Year and Month
time_df = merged_and_ordered_df['Year','Month']
#time_df.show()
# Create a dataframe to store the macroeconomic and CPI data
macro_and_cpi_df = merged_and_ordered_df['CPI','RPI','USTRADE','USWTRADE']
macro_and_cpi_df.show()

## Database Information

In [None]:
database = "capstone-group6-database"
table1 = "dbo.MacroTable"
table2 = "dbo.DateTable"
table3 = "dbo.NAICS_NAPCS"
table4 = "dbo.NAICSTable"
table5 = "dbo.NAPCSTable"
table6 = "dbo.SalesTable"
user = 
password  = 
server = "gen10-data-fundamentals-22-05-sql-server.database.windows.net"

## Populate the date table

In [None]:
time_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option('truncate', 'true') \
    .mode("append") \
    .option("dbtable", table2) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()
    #.option('truncate', 'true') \

In [None]:
dateid_df = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table2) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
dateid_df.show()

In [None]:
# Join Date table and Macro/CPI Data so it will have the DateID foreign key (Join on month and year)
macro_and_dateid_df = merged_and_ordered_df.join(dateid_df, ['Year','Month'], 'full')
# Drop the row with an empty dateid
macro_and_dateid_df = macro_and_dateid_df.na.drop(subset = 'DateID')
# Drop the month and year columns
macro_table_df = macro_and_dateid_df['DateID', 'CPI', 'RPI', 'USTRADE', 'USWTRADE']
macro_table_df = macro_table_df.na.drop(subset = 'RPI')
macro_table_df.show()

## Populate the macro table

In [None]:
macro_table_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option('truncate', 'true') \
    .mode("overwrite") \
    .option("dbtable", table1) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

## Read in the data from the consumed folder and turn them into dataframes

Create a mount point

In [None]:
storageAccount2 = "gen10datafund2205"
storageContainer2 = "capstone-group6-consumed"
mount_point2 = "/mnt/gavanvanover/consumed_data_for_use"
    
    
configs2 = {"fs.azure.account.auth.type": "OAuth",
   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
   "fs.azure.account.oauth2.client.id": clientid,
   "fs.azure.account.oauth2.client.secret": clientSecret,
   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
   "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try:
    dbutils.fs.unmount(mount_point2)
except:
    pass


dbutils.fs.mount(
source = f"abfss://{storageContainer2}@{storageAccount2}.dfs.core.windows.net/", 
mount_point = mount_point2, 
extra_configs = configs2)

In [None]:
display(dbutils.fs.ls("/mnt/gavanvanover/consumed_data_for_use"))

path,name,size,modificationTime
dbfs:/mnt/gavanvanover/consumed_data_for_use/NAICS_NAPCS_2017.csv/,NAICS_NAPCS_2017.csv/,0,1660141174000
dbfs:/mnt/gavanvanover/consumed_data_for_use/NAICS_meanings.csv/,NAICS_meanings.csv/,0,1660141179000
dbfs:/mnt/gavanvanover/consumed_data_for_use/NAPCS_meanings.csv/,NAPCS_meanings.csv/,0,1660141190000


Read in the csvs

In [None]:
census_df = spark.read.csv('/mnt/gavanvanover/consumed_data/NAICS_NAPCS_2017.csv', header = 'True')
naics_df = spark.read.csv('/mnt/gavanvanover/consumed_data/NAICS_meanings.csv', header = 'True')
napcs_df = spark.read.csv('/mnt/gavanvanover/consumed_data/NAPCS_meanings.csv', header = 'True')
naics_df.show()

Write to the database

In [None]:
napcs_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option('truncate', 'true') \
    .mode("append") \
    .option("dbtable", table5) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()
    #.option('truncate', 'true') \


In [None]:
naics_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option('truncate', 'true') \
    .mode("append") \
    .option("dbtable", table4) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()
    #.option('truncate', 'true') \

In [None]:
napcs_id_df = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table5) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
napcs_id_df.show()

In [None]:
naics_id_df = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table4) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
naics_id_df.show()

## Add the NAPCS ID and NAICS ID to the census table

In [None]:
# Join Census Data with the NAICS table to get the NAICS ID
census_and_naics_id_df = census_df.join(naics_id_df, ['2017 NAICS code (NAICS2017)'], 'full')
# Drop the NAICS Code Column
census_and_naics_id_df = census_and_naics_id_df['NAICSID', '2017 NAPCS collection code (NAPCS2017)', 'Number of establishments (ESTAB)','Sales, value of shipments, or revenue of NAPCS collection code ($1,000) (NAPCSDOL)']
census_and_naics_id_df.show()

In [None]:
# Join the previous dataframe with the NAPCS table to get the NAPCS ID
complete_census_df = census_and_naics_id_df.join(napcs_id_df, ['2017 NAPCS collection code (NAPCS2017)'], 'full')
# Drop the NAPCS Code Column
complete_census_df = complete_census_df['NAICSID', 'NAPCSID', 'Number of establishments (ESTAB)','Sales, value of shipments, or revenue of NAPCS collection code ($1,000) (NAPCSDOL)']
#complete_census_df2 = complete_census_df.drop('2017 NAPCS collection code (NAPCS2017)', 'Meaning of NAPCS collection code (NAPCS2017_LABEL)')
#complete_census_df = complete_census_df.na.drop(subset = ['Number of establishments (ESTAB)', 'Sales, value of shipments, or revenue of NAPCS collection code ($1,000) (NAPCSDOL)'])
#complete_census_df = complete_census_df.na.drop("any")
complete_census_df.show()

In [None]:
complete_census_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .mode("overwrite") \
    .option("dbtable", table3) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option('truncate', 'true') \
    .save()

## Read the Sales Table by month csv into a dataframe then to the database

In [None]:
sales_df = spark.read.csv('/mnt/gavanvanover/capstone-group6-populating/Salestable_final.csv', header = 'True')
sales_df.show()

In [None]:
append_NAICS_df = sales_df['NAICScode', 'NAICSname'].distinct()

append_NAICS_df = (append_NAICS_df
    .withColumnRenamed('NAICScode', '2017 NAICS code (NAICS2017)')
    .withColumnRenamed('NAICSname', 'Meaning of NAICS code (NAICS2017_LABEL)'))

append_NAICS_df.show()

In [None]:
append_NAICS_df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .mode("append") \
    .option("dbtable", table4) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

In [None]:
sales_df = sales_df.drop('_c0')
# Join Date table and Sales Data so it will have the DateID foreign key (Join on month and year)
sales_and_dateid_df = sales_df.join(dateid_df, ['Year','Month'], 'inner')

#sales_and_dateid_df = sales_and_dateid_df.withColumn('Month',sales_and_dateid_df.Month.cast(IntegerType())) \
    #.withColumn('Year',sales_and_dateid_df..cast(IntegerType()))

cleaned_sales_df = sales_and_dateid_df.drop('Year','Month')
cleaned_sales_df.show()

In [None]:
naics_id_df2 = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table4) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
naics_id_df.show()

In [None]:
# Join Date table and Sales Data so it will have the DateID foreign key (Join on month and year)
cleaned_sales_df2 = cleaned_sales_df.join(naics_id_df2, cleaned_sales_df['NAICSname'] == naics_id_df2['Meaning of NAICS code (NAICS2017_LABEL)'], 'full')
cleaned_sales_df3 = cleaned_sales_df2.drop('NAICScode', 'NAICSname', '2017 NAICS code (NAICS2017)', 'Meaning of NAICS code (NAICS2017_LABEL)')
cleaned_sales_df3.dtypes

In [None]:
# Replace all non int values with nulls
cleaned_sales_df3 = cleaned_sales_df3.withColumn("Unadjusted Sales", \
       when((col("Unadjusted Sales") == "(S)"), None) \
          .otherwise(col("Unadjusted Sales")))

cleaned_sales_df3 = cleaned_sales_df3.withColumn("Adjusted Sales", \
       when((col("Adjusted Sales") == "(S)"), None) \
          .otherwise(col("Adjusted Sales")))
cleaned_sales_df3.dtypes

In [None]:
cleaned_sales_df3.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option('truncate', 'true') \
    .mode("overwrite") \
    .option("dbtable", table6) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()