In [0]:
# --------------------------------------
# 1. Imports PySpark’s types & Functions
# --------------------------------------
# Imports PySpark’s types for defining structured schemas and the col/sum functions for DataFrame transformations
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from pyspark.sql.functions import col, sum

# ---------------------------
# 2. Initialize Spark Session
# ---------------------------
# In Databricks, a SparkSession is automatically created when we start a notebook, so you don’t need to manually create one. 
# The default SparkSession is available as spark

spark = (SparkSession
         .builder
         .appName("DataflowPySpark")
         .getOrCreate())

In [0]:
# -------------------------------------------------------------
# 1. Read Sales.csv file located in the distributed file system
# -------------------------------------------------------------
# 1. File locations and type
sales_file_location = "dbfs:/FileStore/tables/Sales.csv"
file_type = "csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for Sales File
sales_schema = StructType([ StructField('OrderDate', DateType(), True),
                            StructField('StockDate', DateType(), True),
                            StructField('OrderNumber', StringType(), True),
                            StructField('ProductKey', IntegerType(), True),
                            StructField('CustomerKey', IntegerType(), True),
                            StructField('TerritoryKey', IntegerType(), True),
                            StructField('OrderLineItem', IntegerType(), True),
                            StructField('OrderQuantity', IntegerType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe Sales_df.
Sales_df = (spark
            .read
            .format(file_type)
            .option("inferSchema", infer_schema)
            .schema(sales_schema)
            .option("header", first_row_is_header)
            .option("sep", delimiter)
            .load(sales_file_location))

In [0]:
# -----------------------------------------------------------------
# 2. Read Customers.csv file located in the distributed file system
# -----------------------------------------------------------------
# 1. File location
customers_file_location = "dbfs:/FileStore/tables/Customers.csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for customers File
customers_schema = StructType([ StructField('CustomerKey', IntegerType(), nullable=False),
                                StructField('Prefix', StringType(), True),
                                StructField('FirstName', StringType(), True),
                                StructField('LastName', StringType(), True),
                                StructField('BirthDate', DateType(), True),
                                StructField('MaritalStatus', StringType(), True),
                                StructField('Gender', StringType(), True),
                                StructField('EmailAddress', StringType(), True),
                                StructField('AnnualIncome', StringType(), True),
                                StructField('TotalChildren', IntegerType(), True),
                                StructField('EducationLevel', StringType(), True),
                                StructField('Occupation', StringType(), True),
                                StructField('HomeOwner', StringType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe Customers_df
Customers_df = (spark
                .read
                .format(file_type)
                .option("inferSchema", infer_schema)
                .schema(customers_schema)
                .option("header", first_row_is_header)
                .option("sep", delimiter)
                .load(customers_file_location))


In [0]:
# ----------------------------------------------------------------
# 3. Read Products.csv file located in the distributed file system
# ----------------------------------------------------------------
# 1. File location
products_file_location = "dbfs:/FileStore/tables/Products.csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for customers File
products_schema = StructType([StructField('ProductKey', IntegerType(), nullable=False),
                              StructField('ProductSubcategoryKey', IntegerType(), True),
                              StructField('ProductSKU', StringType(), True),
                              StructField('ProductName', StringType(), True),
                              StructField('ModelName', StringType(), True),
                              StructField('ProductDescription', StringType(), True),
                              StructField('ProductColor', StringType(), True),
                              StructField('ProductSize', StringType(), True),
                              StructField('ProductStyle', StringType(), True),
                              StructField('ProductCost', DoubleType(), True),
                              StructField('ProductPrice', DoubleType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe products_df
products_df = (spark
              .read
              .format(file_type)
              .option("inferSchema", infer_schema)
              .schema(products_schema)
              .option("header", first_row_is_header)
              .option("sep", delimiter)
              .load(products_file_location))


In [0]:
# --------------------------------------------------------------------------
# 4. Read product_categories.csv file located in the distributed file system
# --------------------------------------------------------------------------
# 1. File location
product_categories_file_location = "dbfs:/FileStore/tables/Product_Categories.csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for product_categories File
product_categories_schema = StructType([ StructField('ProductCategoryKey', IntegerType(), nullable=False),
                                         StructField('CategoryName', StringType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe product_categories_df
product_categories_df = (spark
                          .read
                          .format(file_type)
                          .option("inferSchema", infer_schema)
                          .schema(product_categories_schema)
                          .option("header", first_row_is_header)
                          .option("sep", delimiter)
                          .load(product_categories_file_location))


In [0]:
# -----------------------------------------------------------------------------
# 5. Read product_subcategories.csv file located in the distributed file system
# -----------------------------------------------------------------------------
# 1. File location
product_subcategories_file_location = "dbfs:/FileStore/tables/Product_Subcategories.csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for product_subcategories File
product_subcategories_schema = StructType([ StructField('ProductSubcategoryKey', IntegerType(), nullable=False),
                                            StructField('SubcategoryName', StringType(), True),
                                            StructField('ProductCategoryKey', IntegerType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe product_subcategories_df
product_subcategories_df = (spark
                            .read
                            .format(file_type)
                            .option("inferSchema", infer_schema)
                            .schema(product_subcategories_schema)
                            .option("header", first_row_is_header)
                            .option("sep", delimiter)
                            .load(product_subcategories_file_location))                      


In [0]:
# -------------------------------------------------------------------
# 6. Read territories.csv file located in the distributed file system
# -------------------------------------------------------------------
# 1. File location
territories_file_location = "dbfs:/FileStore/tables/Territories.csv"

# 2. CSV options
infer_schema = "False"
first_row_is_header = "True"
delimiter = ","

# 3. Define the Schema for territories_file File
territories_schema = StructType([ StructField('SalesTerritoryKey', IntegerType(), nullable=False),
                                  StructField('Region', StringType(), True),
                                  StructField('Country', StringType(), True),
                                  StructField('Continent', StringType(), True)])

# 4. Read the data from a CSV file, with user defined schema into a dataframe territories_df
territories_df = (spark
                  .read
                  .format(file_type)
                  .option("inferSchema", infer_schema)
                  .schema(territories_schema)
                  .option("header", first_row_is_header)
                  .option("sep", delimiter)
                  .load(territories_file_location))

In [0]:
# -----------------
# 7. Transformation 
# -----------------
# Filter the Accessories category in the United States and rollup the Sales & Quantity at the Customer-level.

bike_df = (Sales_df
          .join(other=products_df, on=['ProductKey'], how='left')
          .join(other=product_subcategories_df, on=['ProductSubcategoryKey'], how='left')
          .join(other=product_categories_df, on=['ProductCategoryKey'], how='left')
          .join(other=Customers_df, on='CustomerKey', how='left')
          .join(other=territories_df, on=[Sales_df["TerritoryKey"] == territories_df["SalesTerritoryKey"]], how='left')
          .filter((col('CategoryName')=='Accessories') & (col('Country')=='United States'))
          .groupBy('CustomerKey','FirstName','LastName','CategoryName','Country')
          .agg(sum('ProductPrice').alias('Total_Price'), 
               sum('OrderQuantity').alias('Total_Quantity'))
          .withColumn('Total_Sales', col('Total_Price') * col('Total_Quantity'))
          .select('CustomerKey','FirstName','LastName','CategoryName','Country', 'Total_Sales', 'Total_Quantity'))


In [0]:
# ------------
# 8. Data Load
# ------------
# Processed Data is Loaded into the Databricks Distributed file System 

bike_df.coalesce(1)\
.write\
.mode("overwrite")\
.option("header", True)\
.option("compression", "gzip")\
.csv('dbfs:/user/g.k.lohit@gmail.com')