## Load, Transform the Raw store data to Delta Tables

This notebook will
- read the **csv** flat files from landing zone
- standardize and transform based on the business logic
- store it in **delta** tables for further analytical processing

> **Note** :  This notebook will be invoked through data factory as a pipeline.

The pipeline first copies data from external data source to a **Landing Store**.  
And then this notebook will transform and store it to **Delta Tables**. 


In [7]:
#Import all the required modules
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

StatementMeta(, 301c9984-8b9c-4e4b-8d8c-6711b09714d0, 9, Finished, Available)

In [2]:
#Create Customer schema
schema = StructType([
    StructField('companyID', StringType(), True), 
    StructField('compnayName', StringType(), True), 
    StructField('companyAddress', StringType(), True), 
    StructField('country', StringType(), True), 
    StructField('contactName', StringType(), True),
    StructField('phone', StringType(), True), 
    StructField('eMail', StringType(), True),
    StructField('orderDate', DateType(), True), 
    StructField('salesPerson', StringType(), True),
    StructField('region', StringType(), True), 
    StructField('comments', StringType(), True)  
]) 

#Load Customers table
df = spark.read.format("csv")\
.option("header", True)\
.option("inferschema", True)\
.load("Files/tank_landing_zone/customers.txt")

#standardise phone and date columns
df = df.withColumn("phone", concat(lit("+1-"), col("phone")))\
.withColumn("orderDate",date_format(unix_timestamp(col("orderDate"), "m/d/yy").cast("timestamp"),"MM-dd-yyyy"))\
.withColumn("orderDate", to_date(col("orderDate"),"MM-dd-yyyy"))

#Store it to conformed store
df.write.format("delta")\
.mode("overwrite")\
.save("Tables/customers")

StatementMeta(, , , Waiting, )

In [3]:
#Create Tanks schema
schema = StructType([
    StructField('tankID', StringType(), True), 
    StructField('companyID', StringType(), True), 
    StructField('Owner', StringType(), True), 
    StructField('tankLocation', StringType(), True), 
    StructField('latitude', DecimalType(7,4), True),
    StructField('longitude', DecimalType(7,4), True), 
    StructField('facility', StringType(), True),
    StructField('manufacturer', StringType(), True), 
    StructField('unitPrice', IntegerType(), True),
    StructField('status', StringType(), True), 
    StructField('comments', StringType(), True)  
]) 

#Load Tanks table
df = spark.read.format("csv")\
.schema(schema)\
.option("header", True)\
.load("Files/tank_landing_zone/tanks.txt")

#Store it to conformed store
df.write.format("delta").mode("overwrite").save("Tables/tanks")

StatementMeta(, , , Waiting, )

In [10]:
#Create Designs schema
schema = StructType([
    StructField('designID', StringType(), True), 
    StructField('tankID', StringType(), True), 
    StructField('constructionYear', IntegerType(), True), 
    StructField('installationDate', DateType(), True), 
    StructField('desginStandard', StringType(), True),
    StructField('material', StringType(), True), 
    StructField('diameter', StringType(), True),
    StructField('height', StringType(), True), 
    StructField('thickness', StringType(), True),
    StructField('capacityGross', StringType(), True), 
    StructField('capcityNominal', StringType(), True),
    StructField('operatingHeight', StringType(), True),
    StructField('gemoertryDetails', StringType(), True), 
    StructField('comments', StringType(), True)   
]) 

#Load Designs table
df = spark.read.format("csv")\
.option("header", True)\
.option("inferschema", True)\
.load("Files/tank_landing_zone/designs.txt")

#standardise date
df = df.withColumn("installationDate",date_format(unix_timestamp(col("installationDate"), "m/d/yy").cast("timestamp"),"MM-dd-yyyy"))\
.withColumn("installationDate", to_date(col("installationDate"),"MM-dd-yyyy"))


#Store it to conformed store
df.write.format("delta").mode("overwrite").save("Tables/designs")

StatementMeta(, 301c9984-8b9c-4e4b-8d8c-6711b09714d0, 12, Finished, Available)

In [11]:
#Create Inspection schema
schema = StructType([
    StructField('inspectionID', StringType(), True), 
    StructField('tankID', StringType(), True), 
    StructField('inspectionDate', DateType(), True), 
    StructField('inspectorName', StringType(), True),
    StructField('inspectionCategory', StringType(), True), 
    StructField('inspectionResult', StringType(), True),
    StructField('InspectionType', StringType(), True), 
    StructField('comments', StringType(), True)   
]) 

#Load Inspection table
df = spark.read.format("csv")\
.option("header", True)\
.option("inferschema", True)\
.load("Files/tank_landing_zone/inspections.txt")

#standardise date
df = df.withColumn("inspectionDate",date_format(unix_timestamp(col("inspectionDate"), "m/d/yy").cast("timestamp"),"MM-dd-yyyy"))\
.withColumn("inspectionDate", to_date(col("inspectionDate"),"MM-dd-yyyy"))

#Store it to conformed store
df.write.format("delta").mode("overwrite").save("Tables/inspections")

StatementMeta(, 301c9984-8b9c-4e4b-8d8c-6711b09714d0, 13, Finished, Available)

In [13]:
#Create Reports schema
schema = StructType([
    StructField('reportID', StringType(), True), 
    StructField('inspectionID', StringType(), True), 
    StructField('reportDate', DateType(), True), 
    StructField('reportsURL', StringType(), True),
    StructField('imagesURL', StringType(), True), 
    StructField('videosURL', StringType(), True),
    StructField('comments', StringType(), True)   
]) 

#Load Reports table
df = spark.read.format("csv")\
.option("header", True)\
.option("inferschema", True)\
.load("Files/tank_landing_zone/reports.txt")

#standardise date
df = df.withColumn("reportDate",date_format(unix_timestamp(col("reportDate"), "m/d/yy").cast("timestamp"),"MM-dd-yyyy"))\
.withColumn("reportDate", to_date(col("reportDate"),"MM-dd-yyyy"))

#Store it to conformed store
df.write.format("delta").mode("overwrite").save("Tables/reports")

StatementMeta(, 301c9984-8b9c-4e4b-8d8c-6711b09714d0, 15, Finished, Available)

In [None]:
#Exit the notebook
mssparkutils.notebook.exit("SUCCESS")

StatementMeta(, , , Waiting, )

ExitValue: SUCCESS

#### ====== End of Notebook====== ####