In [1]:
# Import required libraries

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.sql import SQLContext
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:
# Set spark session

spark_session = SparkSession.builder \
          .getOrCreate()

In [3]:
# Create struct schema and load data

schema = StructType([StructField("InvoiceID", StringType(),True), \
                    StructField("Branch", StringType(), False),
                    StructField("City", StringType(),False), \
                    StructField("Customer", StringType(), False), \
                    StructField("Gender", StringType(),False), \
                    StructField("ProductLine", StringType(), False), \
                    StructField("UnitPrice", DoubleType(),False), \
                    StructField("Quantity", IntegerType(), False),\
                    StructField("Tax", DoubleType(), False), \
                    StructField("Sales", DoubleType(),False), \
                    StructField("Date_", StringType(), True), \
                    StructField("Time_", StringType(),False), \
                    StructField("Payment", StringType(), False), \
                    StructField("cog", DoubleType(),False), \
                    StructField("grossMargin", DoubleType(), False),\
                    StructField("grossIncome", DoubleType(), False), \
                    StructField("rating", DoubleType(),False)] )


In [4]:
dataset = spark_session.read.csv(
        path="/home/master/Descargas/supermarketsales.csv",
        sep=",",
        header=True,
        schema=schema)

In [5]:
dataset.show(2)

+-----------+------+---------+--------+------+--------------------+---------+--------+-------+--------+--------+-----+-------+------+-----------+-----------+------+
|  InvoiceID|Branch|     City|Customer|Gender|         ProductLine|UnitPrice|Quantity|    Tax|   Sales|   Date_|Time_|Payment|   cog|grossMargin|grossIncome|rating|
+-----------+------+---------+--------+------+--------------------+---------+--------+-------+--------+--------+-----+-------+------+-----------+-----------+------+
|750-67-8428|     A|   Yangon|  Member|Female|   Health and beauty|    74.69|       7|26.1415|548.9715|1/5/2019|13:08|Ewallet|522.83|4.761904762|    26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|  Normal|Female|Electronic access...|    15.28|       5|   3.82|   80.22|3/8/2019|10:29|   Cash|  76.4|4.761904762|       3.82|   9.6|
+-----------+------+---------+--------+------+--------------------+---------+--------+-------+--------+--------+-----+-------+------+-----------+-----------+------+
only showi

In [6]:
dataset.printSchema()

root
 |-- InvoiceID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax: double (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Date_: string (nullable = true)
 |-- Time_: string (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cog: double (nullable = true)
 |-- grossMargin: double (nullable = true)
 |-- grossIncome: double (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
# Create payment dimension

DimPayment = dataset.select("Payment").distinct().withColumn("PaymentKey", row_number().over(Window.orderBy(monotonically_increasing_id())))
DimPayment.show()

+-----------+----------+
|    Payment|PaymentKey|
+-----------+----------+
|    Ewallet|         1|
|       Cash|         2|
|Credit card|         3|
+-----------+----------+



In [8]:
# Create customer dimension

DimCustomer = dataset.select("Customer","Gender").distinct().withColumn("CustomerKey", row_number().over(Window.orderBy(monotonically_increasing_id())))
DimCustomer.show()

+--------+------+-----------+
|Customer|Gender|CustomerKey|
+--------+------+-----------+
|  Normal|  Male|          1|
|  Member|  Male|          2|
|  Normal|Female|          3|
|  Member|Female|          4|
+--------+------+-----------+



In [9]:
# Create branch dimension

DimBranch = dataset.select("Branch","City").distinct().withColumn("BranchKey", row_number().over(Window.orderBy(monotonically_increasing_id())))
DimBranch.show()

+------+---------+---------+
|Branch|     City|BranchKey|
+------+---------+---------+
|     B| Mandalay|        1|
|     C|Naypyitaw|        2|
|     A|   Yangon|        3|
+------+---------+---------+



In [10]:
# Create productline dimension

DimProductLine = dataset.select("ProductLine").distinct().withColumn("ProductKey", row_number().over(Window.orderBy(monotonically_increasing_id())))
DimProductLine.show()

+--------------------+----------+
|         ProductLine|ProductKey|
+--------------------+----------+
|  Home and lifestyle|         1|
| Fashion accessories|         2|
|   Health and beauty|         3|
|Electronic access...|         4|
|  Food and beverages|         5|
|   Sports and travel|         6|
+--------------------+----------+



In [11]:
#Create date dimension and its additions

Date= dataset.select(to_date(dataset.Date_,"MM/dd/yyyy").alias("Date"),dataset.Date_.alias("Style112")).distinct()
DimDate= Date.select("Date", "Style112",\
                     dayofmonth("Date").alias('day'),\
                     month("Date").alias('month') , \
                     F.trunc(("Date"),"Month").alias("FirstOfMonth"), \
                     date_format(Date.Date,"MMMM").alias("MonthName") , \
                     date_format(Date.Date,"EEEE").alias("DayName") , \
                     weekofyear("Date").alias('WeekOfYear'),\
                     dayofweek("Date").alias('DayOfWeek'),\
                     quarter("Date").alias('quarter') , \
                     year("Date").alias('year') , \
                     F.trunc(("Date"),"Year").alias("FirstOfYear")) \
                    .withColumn("DateKey",regexp_replace(col("Date"),"-","").cast(IntegerType())) 
      
DimDate = DimDate.withColumn("IsWeekend",F.when(DimDate.DayName =="Saturday",1)\
                                        .when(DimDate.DayName =="Sunday",1)\
                                        .otherwise(0))
DimDate.show(5)
DimDate.printSchema()

+----------+---------+---+-----+------------+---------+--------+----------+---------+-------+----+-----------+--------+---------+
|      Date| Style112|day|month|FirstOfMonth|MonthName| DayName|WeekOfYear|DayOfWeek|quarter|year|FirstOfYear| DateKey|IsWeekend|
+----------+---------+---+-----+------------+---------+--------+----------+---------+-------+----+-----------+--------+---------+
|2019-02-19|2/19/2019| 19|    2|  2019-02-01| February| Tuesday|         8|        3|      1|2019| 2019-01-01|20190219|        0|
|2019-03-17|3/17/2019| 17|    3|  2019-03-01|    March|  Sunday|        11|        1|      1|2019| 2019-01-01|20190317|        1|
|2019-03-29|3/29/2019| 29|    3|  2019-03-01|    March|  Friday|        13|        6|      1|2019| 2019-01-01|20190329|        0|
|2019-03-07| 3/7/2019|  7|    3|  2019-03-01|    March|Thursday|        10|        5|      1|2019| 2019-01-01|20190307|        0|
|2019-03-01| 3/1/2019|  1|    3|  2019-03-01|    March|  Friday|         9|        6|     

In [12]:
#Create time dimension

time = dataset.select(from_unixtime(unix_timestamp(dataset.Time_,"HH:mm")).alias("Time"),dataset.Time_.alias("TimeString")).distinct()
DimTime=time.withColumn("TimeKey", row_number().over(Window.orderBy(monotonically_increasing_id())))
DimTime.show()

+-------------------+----------+-------+
|               Time|TimeString|TimeKey|
+-------------------+----------+-------+
|1970-01-01 10:37:00|     10:37|      1|
|1970-01-01 11:36:00|     11:36|      2|
|1970-01-01 17:49:00|     17:49|      3|
|1970-01-01 18:58:00|     18:58|      4|
|1970-01-01 10:36:00|     10:36|      5|
|1970-01-01 19:35:00|     19:35|      6|
|1970-01-01 20:33:00|     20:33|      7|
|1970-01-01 20:44:00|     20:44|      8|
|1970-01-01 19:54:00|     19:54|      9|
|1970-01-01 12:08:00|     12:08|     10|
|1970-01-01 16:49:00|     16:49|     11|
|1970-01-01 11:22:00|     11:22|     12|
|1970-01-01 20:35:00|     20:35|     13|
|1970-01-01 13:14:00|     13:14|     14|
|1970-01-01 18:37:00|     18:37|     15|
|1970-01-01 15:24:00|     15:24|     16|
|1970-01-01 11:00:00|     11:00|     17|
|1970-01-01 10:26:00|     10:26|     18|
|1970-01-01 13:10:00|     13:10|     19|
|1970-01-01 11:16:00|     11:16|     20|
+-------------------+----------+-------+
only showing top

In [13]:
#Create fact table

FactSales = dataset.join(DimPayment,dataset["Payment"] == DimPayment["Payment"]) \
                .join(DimProductLine,dataset["ProductLine"] == DimProductLine["ProductLine"])  \
               .join(DimBranch, (dataset["Branch"] == DimBranch["Branch"])&(dataset["City"] == DimBranch["City"]) )  \
               .join(DimCustomer, (dataset["Customer"] == DimCustomer["Customer"])&(dataset["Gender"] == DimCustomer["Gender"]))\
               .join(DimDate,dataset["Date_"] == DimDate["Style112"]) \
               .join(DimTime,dataset["Time_"] == DimTime["TimeString"])  \
               .select("PaymentKey","DateKey","TimeKey","ProductKey","BranchKey","CustomerKey",
                       "InvoiceID","UnitPrice","Quantity","Tax","cog","grossMargin","grossIncome","rating")


FactSales.show(5)

+----------+--------+-------+----------+---------+-----------+-----------+---------+--------+-------+------+-----------+-----------+------+
|PaymentKey| DateKey|TimeKey|ProductKey|BranchKey|CustomerKey|  InvoiceID|UnitPrice|Quantity|    Tax|   cog|grossMargin|grossIncome|rating|
+----------+--------+-------+----------+---------+-----------+-----------+---------+--------+-------+------+-----------+-----------+------+
|         1|20190105|    345|         3|        3|          4|750-67-8428|    74.69|       7|26.1415|522.83|4.761904762|    26.1415|   9.1|
|         2|20190308|     68|         4|        2|          3|226-31-3081|    15.28|       5|   3.82|  76.4|4.761904762|       3.82|   9.6|
|         3|20190303|     22|         1|        3|          1|631-41-3108|    46.33|       7|16.2155|324.31|4.761904762|    16.2155|   7.4|
|         1|20190127|      7|         3|        3|          2|123-19-1176|    58.22|       8| 23.288|465.76|4.761904762|     23.288|   8.4|
|         1|20190208

In [14]:
FactSales.toPandas().head()

Unnamed: 0,PaymentKey,DateKey,TimeKey,ProductKey,BranchKey,CustomerKey,InvoiceID,UnitPrice,Quantity,Tax,cog,grossMargin,grossIncome,rating
0,1,20190105,345,3,3,4,750-67-8428,74.69,7,26.1415,522.83,4.761905,26.1415,9.1
1,2,20190308,68,4,2,3,226-31-3081,15.28,5,3.82,76.4,4.761905,3.82,9.6
2,3,20190303,22,1,3,1,631-41-3108,46.33,7,16.2155,324.31,4.761905,16.2155,7.4
3,1,20190127,7,3,3,2,123-19-1176,58.22,8,23.288,465.76,4.761905,23.288,8.4
4,1,20190208,1,6,3,1,373-73-7910,86.31,7,30.2085,604.17,4.761905,30.2085,5.3


In [15]:
sqlContext = SQLContext(spark_session)
sqlContext.registerDataFrameAsTable(FactSales,"FactTable")
sqlContext.registerDataFrameAsTable(DimDate,"DimDate")
sqlContext.registerDataFrameAsTable(DimTime,"DimTime")
sqlContext.registerDataFrameAsTable(DimProductLine,"DimProductLine")
sqlContext.registerDataFrameAsTable(DimBranch,"DimBranch")
sqlContext.registerDataFrameAsTable(DimCustomer,"DimCustomer")
sqlContext.registerDataFrameAsTable(DimPayment,"DimPayment")

In [16]:
sqlContext.tables().show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
|        |     dimbranch|       true|
|        |   dimcustomer|       true|
|        |       dimdate|       true|
|        |    dimpayment|       true|
|        |dimproductline|       true|
|        |       dimtime|       true|
|        |     facttable|       true|
+--------+--------------+-----------+

