# Building Delta lake using PySpark

This notebook has been run on [databricks community edition](https://community.cloud.databricks.com/). It details the data engineering process to load data, create delta tables and enrich data using spark. In a business scenario, we typically have external data sources such as azure data lake, aws s3 buckets mounted to databricks workspace. More information on it [here](https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html). We will be manually loading a dataset and go through the steps listed above.

In [None]:
# from pyspark.sql.types import IntegerType,BooleanType,DateType,StringType
# import pyspark.sql.functions as F
# from pyspark.sql.functions import col, when
# from pyspark.sql.window import Window

# spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

Out[31]: DataFrame[key: string, value: string]

In [None]:
# File location and type
file_location = "/FileStore/tables/data.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,United Kingdom


In [None]:
print('shape of the dataset ({},{})'.format(df.count(),len(df.columns)))

shape of the dataset (541909,8)


Ok, now we have dataset loaded into workspace. Lets persist it as a delta table and create a table object so that we can query it using sql

In [None]:
( df
    .write
    .format('delta')
    .mode('overwrite')
    .save('/bronze/raw_data')
)



In [None]:
%fs ls

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/bronze/,bronze/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/ml/,ml/,0
dbfs:/tmp/,tmp/,0


In [None]:
%fs ls /bronze/raw_data/

path,name,size
dbfs:/bronze/raw_data/_delta_log/,_delta_log/,0
dbfs:/bronze/raw_data/part-00000-a0032286-e077-4e95-baf5-a8515365a85b-c000.snappy.parquet,part-00000-a0032286-e077-4e95-baf5-a8515365a85b-c000.snappy.parquet,502410
dbfs:/bronze/raw_data/part-00001-9e30253e-ebfa-466b-a858-2414c30276c6-c000.snappy.parquet,part-00001-9e30253e-ebfa-466b-a858-2414c30276c6-c000.snappy.parquet,531089
dbfs:/bronze/raw_data/part-00002-df12d400-26e4-45d0-adc6-0a9f10982697-c000.snappy.parquet,part-00002-df12d400-26e4-45d0-adc6-0a9f10982697-c000.snappy.parquet,540861
dbfs:/bronze/raw_data/part-00003-be7ccb9d-45ae-4b70-ae6d-35f854bd8081-c000.snappy.parquet,part-00003-be7ccb9d-45ae-4b70-ae6d-35f854bd8081-c000.snappy.parquet,527476
dbfs:/bronze/raw_data/part-00004-864d39b0-772a-43b5-9fd6-846b11a4f4cb-c000.snappy.parquet,part-00004-864d39b0-772a-43b5-9fd6-846b11a4f4cb-c000.snappy.parquet,520928
dbfs:/bronze/raw_data/part-00005-1ecf7ded-6ee5-49d7-8e3f-7a0e5d0471d5-c000.snappy.parquet,part-00005-1ecf7ded-6ee5-49d7-8e3f-7a0e5d0471d5-c000.snappy.parquet,507572
dbfs:/bronze/raw_data/part-00006-b98eda72-ebc4-4d95-b0fc-518681c1b267-c000.snappy.parquet,part-00006-b98eda72-ebc4-4d95-b0fc-518681c1b267-c000.snappy.parquet,505392
dbfs:/bronze/raw_data/part-00007-a2aae27a-e1f2-44f5-a744-290981038820-c000.snappy.parquet,part-00007-a2aae27a-e1f2-44f5-a744-290981038820-c000.snappy.parquet,189525


In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS db ") 

Out[23]: DataFrame[]

In [None]:
spark.sql('''
CREATE TABLE db.raw_data
  USING DELTA 
  LOCATION '/bronze/raw_data'
''')

Out[24]: DataFrame[]

In [None]:
%sql select * from db.raw_data

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerId,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,United Kingdom


We managed to load csv files, convert them into delta format and persist them in a database. We shall now perform data transformations to enrich data and prepare datasets which can be used for data analysis.

In [None]:
transactions = spark.sql('''select * from db.raw_data ''')

In [None]:
transactions = transactions.withColumn('InvoiceDate', F.from_unixtime(F.unix_timestamp(col('InvoiceDate'),'MM/dd/yyyy HH:mm'),'yyyyMMdd')) \
                  .withColumn('CustomerId', col('CustomerId').cast(StringType())) 

w = Window().partitionBy('InvoiceNo')
transactions = transactions.withColumn('ItemCost', F.round(col('Quantity')*col('UnitPrice'),2)) \
                           .withColumn('InvoiceItems', F.size(F.collect_set('StockCode').over(w))) \
                           .withColumn('InvoiceCost', F.sum('ItemCost').over(w))

In [None]:
display(transactions)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerId,Country,ItemCost,InvoiceItems,InvoiceCost
536366,22633,HAND WARMER UNION JACK,6,20101201,1.85,17850.0,United Kingdom,11.1,2,22.2
536366,22632,HAND WARMER RED POLKA DOT,6,20101201,1.85,17850.0,United Kingdom,11.1,2,22.2
536374,21258,VICTORIAN SEWING BOX LARGE,32,20101201,10.95,15100.0,United Kingdom,350.4,1,350.4
536386,84880,WHITE WIRE EGG HOLDER,36,20101201,4.95,16029.0,United Kingdom,178.2,3,508.2
536386,85099C,JUMBO BAG BAROQUE BLACK WHITE,100,20101201,1.65,16029.0,United Kingdom,165.0,3,508.2
536386,85099B,JUMBO BAG RED RETROSPOT,100,20101201,1.65,16029.0,United Kingdom,165.0,3,508.2
536398,21980,PACK OF 12 RED RETROSPOT TISSUES,24,20101201,0.29,13448.0,United Kingdom,6.96,17,426.56
536398,21844,RED RETROSPOT MUG,6,20101201,2.95,13448.0,United Kingdom,17.7,17,426.56
536398,22468,BABUSHKA LIGHTS STRING OF 10,4,20101201,6.75,13448.0,United Kingdom,27.0,17,426.56
536398,22637,PIGGY BANK RETROSPOT,8,20101201,2.55,13448.0,United Kingdom,20.4,17,426.56


In [None]:
( transactions
    .write
    .format('delta')
    .mode('overwrite')
    .partitionBy('InvoiceDate')
    .save('/silver/transactions')
)

spark.sql('''CREATE TABLE db.transactions
  USING DELTA 
  LOCATION '/silver/transactions' ''')

Out[59]: DataFrame[]

In [None]:
%sql 
select * from db.transactions

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerId,Country,ItemCost,InvoiceItems,InvoiceCost
536366,22633,HAND WARMER UNION JACK,6,20101201,1.85,17850.0,United Kingdom,11.1,2,22.2
536366,22632,HAND WARMER RED POLKA DOT,6,20101201,1.85,17850.0,United Kingdom,11.1,2,22.2
536374,21258,VICTORIAN SEWING BOX LARGE,32,20101201,10.95,15100.0,United Kingdom,350.4,1,350.4
536386,84880,WHITE WIRE EGG HOLDER,36,20101201,4.95,16029.0,United Kingdom,178.2,3,508.2
536386,85099C,JUMBO BAG BAROQUE BLACK WHITE,100,20101201,1.65,16029.0,United Kingdom,165.0,3,508.2
536386,85099B,JUMBO BAG RED RETROSPOT,100,20101201,1.65,16029.0,United Kingdom,165.0,3,508.2
536398,21980,PACK OF 12 RED RETROSPOT TISSUES,24,20101201,0.29,13448.0,United Kingdom,6.96,17,426.56
536398,21844,RED RETROSPOT MUG,6,20101201,2.95,13448.0,United Kingdom,17.7,17,426.56
536398,22468,BABUSHKA LIGHTS STRING OF 10,4,20101201,6.75,13448.0,United Kingdom,27.0,17,426.56
536398,22637,PIGGY BANK RETROSPOT,8,20101201,2.55,13448.0,United Kingdom,20.4,17,426.56


We have successfully completed ETL process and cleaned the dataset. We are now ready to dive into data analysis.