In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, year, month,to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DecimalType,FloatType,ShortType, DateType

In [60]:
spark = SparkSession.builder.config("spark.jars","/home/bigdata/postgresql-42.6.0.jar") \
   .master("local").appName("ETL Pipeline by Spark").getOrCreate()

# Read Csv Files data into PySpark DataFrame(Data Extraction)

In [113]:
Calender = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Calendar.csv")
Customer = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Customers.csv")
Product_Categories = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Product_Categories.csv")
Product = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Products.csv" )
Product_Sub_Category = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Product_Subcategories.csv" )
Sales_2015 = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Sales_2015.csv")
Sales_2016 = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Sales_2016.csv")
Sales_2017 = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Sales_2017.csv")
Territories = spark.read.option("header","true").csv("/home/bigdata/ETL/Adventure_Works/AdventureWorks_Territories.csv")




# Data Transformation

In [114]:
#Customer Table
Customer.createOrReplaceTempView("Customer_Dim")
query = "SELECT INT(CustomerKey),FirstName,LastName,MaritalStatus,Gender,AnnualIncome,EducationLevel,Occupation  from Customer_Dim ;"
Customer_Dim = spark.sql(query)
Customer_Dim.show()
Customer_Dim.printSchema()

+-----------+---------+--------+-------------+------+------------+---------------+--------------+
|CustomerKey|FirstName|LastName|MaritalStatus|Gender|AnnualIncome| EducationLevel|    Occupation|
+-----------+---------+--------+-------------+------+------------+---------------+--------------+
|      11000|      JON|    YANG|            M|     M|    $90,000 |      Bachelors|  Professional|
|      11001|   EUGENE|   HUANG|            S|     M|    $60,000 |      Bachelors|  Professional|
|      11002|    RUBEN|  TORRES|            M|     M|    $60,000 |      Bachelors|  Professional|
|      11003|  CHRISTY|     ZHU|            S|     F|    $70,000 |      Bachelors|  Professional|
|      11004|ELIZABETH| JOHNSON|            S|     F|    $80,000 |      Bachelors|  Professional|
|      11005|    JULIO|    RUIZ|            S|     M|    $70,000 |      Bachelors|  Professional|
|      11007|    MARCO|   MEHTA|            M|     M|    $60,000 |      Bachelors|  Professional|
|      11008|    ROB

In [115]:
#Product Table
Product_Categories.createOrReplaceTempView("ProductCateg")
Product.createOrReplaceTempView("Product")
Product_Sub_Category.createOrReplaceTempView("ProductSub")
query_prod = " Select INT(P.ProductKey) as ProductId , P.ModelName as ProductName ,PC.CategoryName as Product_Cat_Name ,PSC.SubcategoryName as Product_Sub_Cat from Product P  inner join ProductSub PSC on P.ProductSubcategoryKey = PSC.ProductSubcategoryKey inner join ProductCateg PC on PSC.ProductSubcategoryKey = PC.ProductCategoryKey  ;"
Product_Dim = spark.sql(query_prod)
Product_Dim.show()
Product_Dim.printSchema()

+---------+-----------+----------------+---------------+
|ProductId|ProductName|Product_Cat_Name|Product_Sub_Cat|
+---------+-----------+----------------+---------------+
|      310|   Road-150|      Components|     Road Bikes|
|      311|   Road-150|      Components|     Road Bikes|
|      312|   Road-150|      Components|     Road Bikes|
|      313|   Road-150|      Components|     Road Bikes|
|      314|   Road-150|      Components|     Road Bikes|
|      315|   Road-450|      Components|     Road Bikes|
|      316|   Road-450|      Components|     Road Bikes|
|      317|   Road-450|      Components|     Road Bikes|
|      318|   Road-450|      Components|     Road Bikes|
|      319|   Road-450|      Components|     Road Bikes|
|      320|   Road-650|      Components|     Road Bikes|
|      322|   Road-650|      Components|     Road Bikes|
|      324|   Road-650|      Components|     Road Bikes|
|      326|   Road-650|      Components|     Road Bikes|
|      328|   Road-650|      Co

In [116]:
#Date Table
Calender.createOrReplaceTempView("Date_Dim")
date_query ="Select Date_Format(to_date(regexp_replace(Date,'/','-'),'M-d-yyyy'),'yyyyMd') as DateKey ,year(to_date(regexp_replace(Date,'/','-'),'M-d-yyyy')) as year , concat('Quarter',quarter(to_date(regexp_replace(Date,'/','-'),'M-d-yyyy'))) as Quarter , month(to_date(regexp_replace(Date,'/','-'),'M-d-yyyy')) as Month , day(to_date(regexp_replace(Date,'/','-'),'M-d-yyyy')) as Day from Date_Dim ; "
Date_Dim = spark.sql(date_query)
Date_Dim.show()
Date_Dim.printSchema()

+-------+----+--------+-----+---+
|DateKey|year| Quarter|Month|Day|
+-------+----+--------+-----+---+
| 201511|2015|Quarter1|    1|  1|
| 201512|2015|Quarter1|    1|  2|
| 201513|2015|Quarter1|    1|  3|
| 201514|2015|Quarter1|    1|  4|
| 201515|2015|Quarter1|    1|  5|
| 201516|2015|Quarter1|    1|  6|
| 201517|2015|Quarter1|    1|  7|
| 201518|2015|Quarter1|    1|  8|
| 201519|2015|Quarter1|    1|  9|
|2015110|2015|Quarter1|    1| 10|
|2015111|2015|Quarter1|    1| 11|
|2015112|2015|Quarter1|    1| 12|
|2015113|2015|Quarter1|    1| 13|
|2015114|2015|Quarter1|    1| 14|
|2015115|2015|Quarter1|    1| 15|
|2015116|2015|Quarter1|    1| 16|
|2015117|2015|Quarter1|    1| 17|
|2015118|2015|Quarter1|    1| 18|
|2015119|2015|Quarter1|    1| 19|
|2015120|2015|Quarter1|    1| 20|
+-------+----+--------+-----+---+
only showing top 20 rows

root
 |-- DateKey: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Month: integer (nullable = true)


In [117]:
#Territory Table
Territories.createOrReplaceTempView("Territory_Dim")
Query ="Select INT(SalesTerritoryKey),Region,Country,Continent FROM Territory_Dim ; " 
Territory_Dim = spark.sql(Query) 
Territory_Dim.show()
Territory_Dim.printSchema()


+-----------------+--------------+--------------+-------------+
|SalesTerritoryKey|        Region|       Country|    Continent|
+-----------------+--------------+--------------+-------------+
|                1|     Northwest| United States|North America|
|                2|     Northeast| United States|North America|
|                3|       Central| United States|North America|
|                4|     Southwest| United States|North America|
|                5|     Southeast| United States|North America|
|                6|        Canada|        Canada|North America|
|                7|        France|        France|       Europe|
|                8|       Germany|       Germany|       Europe|
|                9|     Australia|     Australia|      Pacific|
|               10|United Kingdom|United Kingdom|       Europe|
+-----------------+--------------+--------------+-------------+

root
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Count

In [118]:
#Sales Fact Table
Sales_2015.createOrReplaceTempView("Sales_2015")
Sales_2016.createOrReplaceTempView("Sales_2016")
Sales_2017.createOrReplaceTempView("Sales_2017")
Product.createOrReplaceTempView("Product")
Query ="Select OrderNumber,INT(CustomerKey),INT(S15.ProductKey),Date_Format(to_date(regexp_replace(OrderDate,'/','-'),'M-d-yyyy'),'yyyyMd') as DateKey,INT(TerritoryKey),OrderQuantity,P.ProductCost as ProdcutCost,P.ProductPrice as ProductPrice FROM Sales_2015 S15 inner join Product P on S15.ProductKey = P.ProductKey union select OrderNumber,INT(CustomerKey),INT(S16.ProductKey),Date_Format(to_date(regexp_replace(OrderDate,'/','-'),'M-d-yyyy'),'yyyyMd') as DateKey,INT(TerritoryKey),OrderQuantity,P.ProductCost as ProdcutCost,P.ProductPrice as ProductPrice FROM Sales_2016 S16 inner join Product P on S16.ProductKey = P.ProductKey  union select OrderNumber,INT(CustomerKey),INT(S17.ProductKey),Date_Format(to_date(regexp_replace(OrderDate,'/','-'),'M-d-yyyy'),'yyyyMd') as DateKey,INT(TerritoryKey),OrderQuantity,P.ProductCost as ProdcutCost,P.ProductPrice as ProductPrice FROM Sales_2017 S17 inner join Product P on S17.ProductKey = P.ProductKey  ; " 
Sales_Fact = spark.sql(Query) 
Sales_Fact.show()
Sales_Fact.printSchema()

+-----------+-----------+----------+--------+------------+-------------+-----------+------------+
|OrderNumber|CustomerKey|ProductKey| DateKey|TerritoryKey|OrderQuantity|ProdcutCost|ProductPrice|
+-----------+-----------+----------+--------+------------+-------------+-----------+------------+
|    SO45164|      11453|       349| 2015115|           9|            1|  1898.0944|     3374.99|
|    SO45240|      29155|       310| 2015128|           1|            1|  2171.2942|     3578.27|
|    SO45466|      19611|       311| 2015221|           9|            1|  2171.2942|     3578.27|
|    SO45641|      23306|       311| 2015310|           6|            1|  2171.2942|     3578.27|
|    SO45946|      11914|       350| 2015420|           9|            1|  1898.0944|     3374.99|
|    SO46453|      20626|       313| 2015610|           9|            1|  2171.2942|     3578.27|
|    SO46677|      15460|       371|  201571|           8|            1|  1320.6838|   2181.5625|
|    SO47094|      1

# Load transformed data 

In [119]:
Customer_Dim.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('/home/bigdata/ETL/Output/Customer_Dim.csv')
Product_Dim.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('/home/bigdata/ETL/Output/Product_Dim.csv')
Territory_Dim.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('/home/bigdata/ETL/Output/Territory_Dim.csv')
Date_Dim.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('/home/bigdata/ETL/Output/Date_Dim.csv')
Sales_Fact.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('/home/bigdata/ETL/Output/Sales_Fact.csv')