###Import Libraries

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession

### Load Bronze Layer (Unprocessed)

In [0]:
spark = SparkSession.builder.appName('app').getOrCreate() # Create Spark Session

# Import Drive Data (Json)
df_json = spark.read.format('json').option('inferSchema',True)\
                    .option('header',True)\
                    .option('multiLine',False)\
                    .load('/Volumes/ben-demo/default/bigmart/drivers.json')

# Import Sales Data (CSV)
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/ben-demo/default/bigmart/BigMart Sales.csv')


df.display() # Display CSV
df_json.display() # Display Json

'''
# Combine Data and display
df_combined = df.unionByName(df_json, allowMissingColumns=True)
df_combined.display()
'''

### Clean Data (Silver)

In [0]:
# Clean Data

# Create new schemas for only wanted datatypes
shopInfo_schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', StringType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_MRP', StringType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', StringType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Item_Outlet_Sales', StringType(), True)
])

drivers_schema = StructType([
    StructField("driverId", StringType(), True),
    StructField("number", StringType(), True),
    StructField("code", StringType(), True),  
    StructField("name", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])

selected_shop_cols = [f.name for f in shopInfo_schema.fields]
selected_driver_cols = [f.name for f in drivers_schema.fields]

df_shopInfo = df.select(*selected_shop_cols)
df_drivers = df_json.select(*selected_driver_cols)

df_shopInfo.display()
df_drivers.display()

df_shopInfo.write.mode('overwrite') \
    .option('header', True) \
    .csv('/Volumes/ben-demo/default/bigmart/BigMart_Cleaned')

df_shopInfo.write.mode('overwrite') \
    .option('header', True) \
    .csv('/Volumes/ben-demo/default/bigmart/drivers_cleaned.json')