In [0]:
import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType,StructField,DateType,TimestampType,StringType,IntegerType,DecimalType
from pyspark.sql.functions import year, month, dayofmonth,to_timestamp,to_date,split,substring,col,when
from pyspark.sql.functions import udf

In [0]:
dbutils.widgets.text("storage_acct", "","")
storage_acct = dbutils.widgets.get("storage_acct")

dbutils.widgets.text("container", "","")
container_name = dbutils.widgets.get("container")

dbutils.widgets.text("blob_key", "","")
blob_key = dbutils.widgets.get("blob_key")

dbutils.widgets.text("input_path", "","")
blob_key = dbutils.widgets.get("input_path")

dbutils.widgets.text("output_path", "","")
blob_key = dbutils.widgets.get("output_path")


In [0]:
mountName = "walmartSalesETL"
mounts = [str(i) for i in dbutils.fs.ls('/mnt/')] 

if "FileInfo(path='dbfs:/mnt/" + mountName + "/', name='" + mountName + "/', size=0)" in mounts:
  print("mount already created")
else:
  dbutils.fs.mount(
    source = "wasbs://" + container + "@" + storage_acct +".blob.core.windows.net",
    mount_point = "/mnt/walmartSalesETL",
    extra_configs = {"fs.azure.account.key." + storage_acct + ".blob.core.windows.net":blob_key})
  

In [0]:
def replaceNullCustomerID(df):
  df = df.na.fill('Guest',subset = ["Customer ID"])
  return df

def replaceNullDescription(df):
  df = df.na.fill('Unlisted',subset = ["Description"])
  return df

def addcolumnQuarter(df):
  split_col = split(df['InvoiceDate'], '/')
  df = df.withColumn("Year",substring(split_col.getItem(2),1,4))
  df = df.withColumn('month',split_col.getItem(0).cast("int"))
  df = df.withColumn('Qtr',(when( (col("month") == 1) | (col("month") == 2) | (col("month") == 3),"Qtr1")
                           .when( (col("month") == 4) | (col("month") == 5) | (col("month") == 6),"Qtr2")
                           .when( (col("month") == 7) | (col("month") == 8) | (col("month") == 9) ,"Qtr3")
                           .otherwise("Qtr4")))
  df = df.drop(col("Year"))
  df = df.drop(col("month"))
  
  return df
  
def addcolumnInvoiceType(df):
  df = df.withColumn('InvoiceType',(when( (col("Quantity") <= 0),"Return")
                         .otherwise("Purchase")))
  return df

def filterDf(df):
  df_uk = df.filter(df.Country == 'United Kingdom')
  df_others = df.filter(df.Country != 'United Kingdom')
  return df_uk,df_others

In [0]:
def main():
  try:
    inputFilePath = "/mnt/walmartSalesETL/inputFile/"
    salesDf = spark.read.option("header","true").csv(inputFilePath)
    
    # Handle Null Customer values
    salesDf = replaceNullCustomerID(salesDf)
  
    # Handle Null Description
    salesDf = replaceNullDescription(salesDf)
  
    # Add Quarter column
    salesDf = addcolumnQuarter(salesDf)
  
    # Add Invoice Type column
    salesDf = addcolumnInvoiceType(salesDf)
  
    # Split files by UK and Others
    salesDf_uk,salesDf_others = filterDf(salesDf)
  
    # Write UK files to output folder
    outPath = "/mnt/walmartSalesETL/outputFile/sales_uk"
    salesDf_uk.write.mode("append").csv(outPath)

    # Write others files to output folder
    outPath = "/mnt/walmartSalesETL/outputFile/sales_others"
    salesDf_others.write.mode("append").csv(outPath)
  
  except Exception as e:
    print("Exception occured " + str(e))

In [0]:
try:
  main()
except Exception as e:
  print("Exception occured " + str(e))