## ETL based on TPC-DI Project

Prepared By:

    1. Ahmed El-Sayed Hamdan - 191035
    2. Mohamed Ahmed Elkhateeb - 191017
    3. Moaaz Youssef Ghonaimy - 191036
    4. Maryam Akram Elghalban - 191084

===============================================================

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("ETL based on TPC-DI") \
    .getOrCreate()

In [3]:
import os
os.environ['PYSPARK_PYTHON'] = 'python3'

## Building the dimensions by reading text, csv or xml files into dataframes then make a transfromation

### Building StatusType DIM 

In [4]:
# Reading Status from text file using csv function but by passing the delimiter which is |
# and the schema has 2 colums. (ST_ID and ST_NAME)

statusdf = spark.read.option("header","false")\
    .option("delimiter","|")\
    .option("inferSchema","true")\
    .schema(
        StructType(
            [
                StructField("ST_ID",StringType()),
                StructField("ST_NAME",StringType())
            ]
        )
    )\
    .csv('Data/StatusType.txt') 

In [5]:
# check if the reading is done successfully by show the data
statusdf.show(5)

+-----+---------+
|ST_ID|  ST_NAME|
+-----+---------+
| ACTV|   Active|
| CMPT|Completed|
| CNCL| Canceled|
| PNDG|  Pending|
| SBMT|Submitted|
+-----+---------+
only showing top 5 rows



### Read data from Tax Rate file 

In [6]:
# read the data of tax becaseu it will be needed later in customer dimension
taxrate = spark.read.option("header","false")\
    .option("delimiter","|")\
    .option("inferSchema","true")\
    .schema(
        StructType(
            [
                StructField("TX_ID",StringType()),
                StructField("TX_NAME",StringType()),
                StructField("TX_RATE",FloatType())
            ]
        )
    )\
    .csv('Data/TaxRate.txt')

In [7]:
# check if the reading is done successfully by show the data
taxrate.show(5)

+-----+--------------------+-------+
|TX_ID|             TX_NAME|TX_RATE|
+-----+--------------------+-------+
|  US1|U.S. Income Tax B...|   0.15|
|  US2|U.S. Income Tax B...|  0.275|
|  US3|U.S. Income Tax B...|  0.305|
|  US4|U.S. Income Tax B...|  0.355|
|  US5|U.S. Income Tax B...|  0.391|
+-----+--------------------+-------+
only showing top 5 rows



### Building Date DIM 

In [8]:
# Data dimension is has a very nice calculated columns week in year, day of week, and quarter in the year
# which will facilitae makeing the queries later
DimDate = spark.read.option("header","false")\
    .option("delimiter","|")\
    .option("inferSchema","true")\
    .schema(
        StructType(
                [
                    StructField("SK_DateID",IntegerType()),
                    StructField("DateValue",DateType()),                
                    StructField("DateDesc",StringType()),
                    StructField("CalendarYearID",IntegerType()),
                    StructField("CalendarYearDesc",StringType()),
                    StructField("CalendarQtrID",IntegerType()),
                    StructField("CalendarQtrDesc",StringType()),
                    StructField("CalendarMonthID",IntegerType()),
                    StructField("CalendarMonthDesc",StringType()),
                    StructField('CalendarWeekID',IntegerType()),
                    StructField('CalendarWeekDesc',StringType()),
                    StructField('DayOfWeekNum',IntegerType()),
                    StructField("DayOfWeekDesc",StringType()),
                    StructField("FiscalYearID",IntegerType()),
                    StructField("FiscalYearDesc",StringType()),
                    StructField('FiscalQtrID',IntegerType()),
                    StructField('FiscalQtrDesc',StringType()),
                    StructField("HolidayFlag",BooleanType())
                ]
                )
            )\
            .csv('Data/Date.txt') 


In [9]:
# check if the reading is done successfully by show the data
DimDate.select('DateValue','CalendarYearID','CalendarMonthID','CalendarQtrID').show(5)

+----------+--------------+---------------+-------------+
| DateValue|CalendarYearID|CalendarMonthID|CalendarQtrID|
+----------+--------------+---------------+-------------+
|1950-01-01|          1950|          19501|        19501|
|1950-01-02|          1950|          19501|        19501|
|1950-01-03|          1950|          19501|        19501|
|1950-01-04|          1950|          19501|        19501|
|1950-01-05|          1950|          19501|        19501|
+----------+--------------+---------------+-------------+
only showing top 5 rows



### Building Time DIM 

In [10]:
# read time dimension
Dimtime = spark.read.option("header","false")\
        .option("delimiter","|")\
        .option("inferSchema","true")\
       .schema(
            StructType(
                [
                    StructField("SK_TimeID",IntegerType()),
                    StructField("TimeValue",StringType()),
                    StructField("HourID",IntegerType()),
                    StructField("HourDesc",StringType()),
                    StructField("MinuteID",IntegerType()),
                    StructField("MinuteDesc",StringType()),
                    StructField("SecondID",IntegerType()),
                    StructField("SecondDesc",StringType()),
                    StructField("MarketHoursFlag",BooleanType()),
                    StructField("OfficeHoursFlag",BooleanType())
                ]
            )
        )\
       .csv("Data/Time.txt")

In [11]:
# check if the reading is done successfully by show the data
Dimtime.select("SK_TimeID", "TimeValue", "HourID", "MinuteID", "SecondID").show(5)

+---------+---------+------+--------+--------+
|SK_TimeID|TimeValue|HourID|MinuteID|SecondID|
+---------+---------+------+--------+--------+
|        0| 00:00:00|     0|       0|       0|
|        1| 00:00:01|     0|       0|       1|
|        2| 00:00:02|     0|       0|       2|
|        3| 00:00:03|     0|       0|       3|
|        4| 00:00:04|     0|       0|       4|
+---------+---------+------+--------+--------+
only showing top 5 rows



### Building Broker DIM 

In [12]:
# rading data from HR.csv file to get the broker information for later usage in customer dimenstion
DimBroker = spark.read\
           .format("csv")\
           .option("header", "false")\
           .option("inferSchema","true")\
           .schema(
                StructType(
                    [
                        StructField("EmployeeID",StringType()),
                        StructField("ManagerID",StringType()),
                        StructField("EmployeeFirstName",StringType()),
                        StructField("EmployeeLastName",StringType()),
                        StructField("EmployeeMI",StringType()),
                        StructField("EmployeeJobCode",StringType()),
                        StructField("EmployeeBranch",StringType()),
                        StructField("EmployeeOffice",StringType()),
                        StructField("EmployeePhone",StringType())
                    ]
                )
            )\
           .load("Data/HR.csv")

In [13]:
# check if the reading is done successfully by show the data
DimBroker.select("EmployeeID", "EmployeeFirstName", "EmployeeLastName").show(5)

+----------+-----------------+----------------+
|EmployeeID|EmployeeFirstName|EmployeeLastName|
+----------+-----------------+----------------+
|         0|            Ozkan|         Douglas|
|         1|             Suer|         Candice|
|         2|        Somisetty|            Jami|
|         3|          Mazurek|       Rosalinda|
|         4|        Aronovich|        Delphine|
+----------+-----------------+----------------+
only showing top 5 rows



In [14]:
# check the count of records before applying the dondition of JOBCODE
DimBroker.count()

50000

In [15]:
# Apply the condition of job code which is the brokers are only 314
DimBroker = DimBroker.where('EmployeeJobCode == 314')
# check the count again to validate that the count after condition is less that the original one
DimBroker.count()

14239

In [16]:
# adding some UDF to apply then on records of broker dimention
from pyspark.sql.functions import udf

# here we set IsCurrent column by true  as descriped in documentation
#we can use  lit("true") but we choose UDF to be more filixble if we need to apply with condition of check another values

def Set_IsCurrent_Col():
    return 'true'

Set_IsCurrent_Col_UDF = udf(Set_IsCurrent_Col, StringType())
DimBroker = DimBroker.withColumn("IsCurrent", Set_IsCurrent_Col_UDF())

In [17]:
# here we set EndDate column by '9999-12-31' as descriped in documentation
def Set_EndDate_Col():
    return '9999-12-31'

Set_EndDate_Col_UDF = udf(Set_EndDate_Col, StringType())
DimBroker = DimBroker.withColumn("EndDate", Set_EndDate_Col_UDF())


In [18]:
# here we set EffectiveDat column by '1950-01-01' which is the erailer date in the date dimension
# but it can be alos the current date
def Set_EffectiveDate_Col():
    return '1950-01-01'

Set_EffectiveDate_Col_UDF = udf(Set_EffectiveDate_Col, StringType())
DimBroker = DimBroker.withColumn("EffectiveDate", Set_EffectiveDate_Col_UDF())

In [19]:
# here we set BatchID column by '1' as descriped in documentation becaseu it is historal load and the batch id is 1
def Set_BatchID_Col():
    return '1'

Set_BatchID_Col_UDF = udf(Set_BatchID_Col, StringType())
DimBroker = DimBroker.withColumn("BatchID", Set_BatchID_Col_UDF())

In [20]:
# prepare the dimenion with columns that just need renaming from orignal names in csv file as descriped in documenation
DimBroker = DimBroker.withColumnRenamed("EmployeeID","BrokerID")           
DimBroker = DimBroker.withColumnRenamed("EmployeeFirstName","FirstName")
DimBroker = DimBroker.withColumnRenamed("EmployeeLastName","LastName")
DimBroker = DimBroker.withColumnRenamed("EmployeeMI","MiddleInitial")
DimBroker = DimBroker.withColumnRenamed("EmployeeBranch","Branch")
DimBroker = DimBroker.withColumnRenamed("EmployeeOffice","Office")
DimBroker = DimBroker.withColumnRenamed("EmployeePhone","Phone")
                        
DimBroker = DimBroker.drop("EmployeeJobCode")

# print schema just to check the columns names and types  
DimBroker.printSchema()

root
 |-- BrokerID: string (nullable = true)
 |-- ManagerID: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- MiddleInitial: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Office: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- IsCurrent: string (nullable = true)
 |-- EndDate: string (nullable = true)
 |-- EffectiveDate: string (nullable = true)
 |-- BatchID: string (nullable = true)



### Building CashTransaction DIM 

In [21]:
# rading data from CashTransaction.txt file to get the CashTransaction information
cashtransaction_txt = spark.read.option("header","false")\
        .option("delimiter","|")\
        .option("inferSchema","true")\
        .schema(
            StructType(
                [
                    StructField("CT_CA_ID",IntegerType()),
                    StructField("CT_DTS",StringType()),
                    StructField("CT_AMT",FloatType()),
                    StructField("CT_NAME",StringType())
                ]
            )
        )\
       .csv("Data/CashTransaction.txt")

In [22]:
# check if the reading is done successfully by show the data
cashtransaction_txt.show(5)

+--------+-------------------+---------+--------------------+
|CT_CA_ID|             CT_DTS|   CT_AMT|             CT_NAME|
+--------+-------------------+---------+--------------------+
|       3|2012-07-11 08:09:52|-37215.14|TGDRsaHPherhApDuH...|
|      52|2012-07-07 17:08:29| -3178.67|PGwhaPC igAVOmHLJ...|
|      55|2012-07-12 17:34:13| -3172.19|uQOUlrpDGHQpeeBGx...|
|      61|2012-09-20 03:07:49| -16621.0|VRIGhrJYHmbmNyXtI...|
|      28|2012-07-09 07:26:05|  -1315.7|gySbOpZLevgVdfrrw...|
+--------+-------------------+---------+--------------------+
only showing top 5 rows



In [23]:
from pyspark.sql.functions import *
# we need to split the date and time in 2 differnet columns
# slit column CT_DTS by Space
split_col = split(cashtransaction_txt['CT_DTS'], ' ')
# get 1st elemetn as date
cashtransaction_txt = cashtransaction_txt.withColumn('Date', split_col.getItem(0))
# get second element as time
cashtransaction_txt = cashtransaction_txt.withColumn('Time', split_col.getItem(1))
# remove unneeded columns
cashtransaction_txt = cashtransaction_txt.drop('CT_DTS', 'CT_NAME')

# check if it works
cashtransaction_txt.show(5)

+--------+---------+----------+--------+
|CT_CA_ID|   CT_AMT|      Date|    Time|
+--------+---------+----------+--------+
|       3|-37215.14|2012-07-11|08:09:52|
|      52| -3178.67|2012-07-07|17:08:29|
|      55| -3172.19|2012-07-12|17:34:13|
|      61| -16621.0|2012-09-20|03:07:49|
|      28|  -1315.7|2012-07-09|07:26:05|
+--------+---------+----------+--------+
only showing top 5 rows



### Building Account and Customer DIM 

#### Reading XML - NEW

In [24]:
# Reading the XML file of CustomerMgmt.xml to build customer and account dimensions later form it
import xml.etree.ElementTree as ET

# here just check about the New action type
def parse_xml(): 
    results = []
    tree = ET.parse('Data/CustomerMgmt.xml')
    root = tree.getroot()
    for child in root.findall('{http://www.tpc.org/tpc-di}Action'):
        #print(child.get('ActionType'))
        if ((child.get('ActionType') == "NEW")):
            rec = []
            Customer = child.find('Customer')
            rec.append(Customer.get('C_ID'))
            rec.append(Customer.get('C_TAX_ID'))
            rec.append(Customer.get('C_GNDR'))
            rec.append(Customer.get('C_TIER'))
            rec.append(Customer.get('C_DOB'))
            Name = Customer.find('Name')
            C_L_NAME = ''
            if Name.find('C_L_NAME').text == None:
                rec.append(C_L_NAME)
            else:
                rec.append(Name.find('C_L_NAME').text)
            C_F_NAME = ''
            if Name.find('C_F_NAME').text == None:
                rec.append(C_F_NAME)
            else:
                rec.append(Name.find('C_F_NAME').text)
            C_M_NAME = ''
            if Name.find('C_M_NAME').text == None:
                rec.append(C_M_NAME)
            else:
                rec.append(Name.find('C_M_NAME').text)
            Address = Customer.find('Address')
            C_ADLINE1 = ''
            if Address.find('C_ADLINE1').text == None:
                rec.append(C_ADLINE1)
            else:
                rec.append(Address.find('C_ADLINE1').text)
            C_ADLINE2 = ''
            if Address.find('C_ADLINE2').text == None:
                rec.append(C_ADLINE2)
            else:
                rec.append(Address.find('C_ADLINE2').text)
            rec.append(Address.find('C_ZIPCODE').text)
            rec.append(Address.find('C_CITY').text)
            rec.append(Address.find('C_STATE_PROV').text)
            rec.append(Address.find('C_CTRY').text)
            ContactInfo = Customer.find('ContactInfo')
            C_PRIM_EMAIL = ''
            if ContactInfo.find('C_PRIM_EMAIL').text == None:
                rec.append(C_PRIM_EMAIL)
            else:
                rec.append(ContactInfo.find('C_PRIM_EMAIL').text)
            C_ALT_EMAIL = ''
            if ContactInfo.find('C_ALT_EMAIL').text == None:
                rec.append(C_ALT_EMAIL)
            else:
                rec.append(ContactInfo.find('C_ALT_EMAIL').text)
                
            # apply the transformation as the documanation for phone1
            #-----------------------------------------------------------
            PHONE_1 = ContactInfo.find('C_PHONE_1')
            C_PHONE_1 = ''
            e_C_PHONE_1 = PHONE_1.find('C_CTRY_CODE')
            if PHONE_1.find('C_CTRY_CODE').text == None:
                C_PHONE_1 = C_PHONE_1 
            else:
                C_PHONE_1 = C_PHONE_1 + PHONE_1.find('C_CTRY_CODE').text
            e_C_PHONE_1 = PHONE_1.find('C_AREA_CODE')
            if PHONE_1.find('C_AREA_CODE').text == None:
                C_PHONE_1 = C_PHONE_1 
            else:
                C_PHONE_1 = C_PHONE_1 + PHONE_1.find('C_AREA_CODE').text
            if PHONE_1.find('C_LOCAL').text == None:
                C_PHONE_1 = C_PHONE_1 
            else:
                C_PHONE_1 = C_PHONE_1 + PHONE_1.find('C_LOCAL').text
            if PHONE_1.find('C_EXT').text == None:
                C_PHONE_1 = C_PHONE_1 
            else:
                C_PHONE_1 = C_PHONE_1 + PHONE_1.find('C_EXT').text
            rec.append(C_PHONE_1)
            
             # apply the transformation as the documanation for phone2
            #-----------------------------------------------------------
            PHONE_2 = ContactInfo.find('C_PHONE_2')
            C_PHONE_2 = ''
            if PHONE_2.find('C_CTRY_CODE').text == None:
                C_PHONE_2 = C_PHONE_2
            else:    
                C_PHONE_2 = C_PHONE_2 + PHONE_2.find('C_CTRY_CODE').text
            if PHONE_2.find('C_AREA_CODE').text == None:
                C_PHONE_2 = C_PHONE_2
            else:
                C_PHONE_2 = C_PHONE_2 + PHONE_2.find('C_AREA_CODE').text
            if PHONE_2.find('C_LOCAL').text == None:
                C_PHONE_2 = C_PHONE_2
            else:    
                C_PHONE_2 = C_PHONE_2 + PHONE_2.find('C_LOCAL').text
            if PHONE_2.find('C_EXT').text == None:
                C_PHONE_2 = C_PHONE_2
            else:    
                C_PHONE_2 = C_PHONE_2 + PHONE_2.find('C_EXT').text
            rec.append(C_PHONE_2)
            
            # apply the transformation as the documanation for phone3
            #-----------------------------------------------------------
            PHONE_3 = ContactInfo.find('C_PHONE_3')
            C_PHONE_3 = ''
            if PHONE_3.find('C_CTRY_CODE').text == None:
                C_PHONE_3 = C_PHONE_3
            else:    
                C_PHONE_3 = C_PHONE_3 + PHONE_3.find('C_CTRY_CODE').text
            if PHONE_3.find('C_AREA_CODE').text == None:
                C_PHONE_3 = C_PHONE_3
            else:    
                C_PHONE_3 = C_PHONE_3 + PHONE_3.find('C_AREA_CODE').text
            if PHONE_3.find('C_LOCAL').text == None:
                C_PHONE_3 = C_PHONE_3
            else:    
                C_PHONE_3 = C_PHONE_3 + PHONE_3.find('C_LOCAL').text
            if PHONE_3.find('C_EXT').text == None:
                C_PHONE_3 = C_PHONE_3
            else:    
                C_PHONE_3 = C_PHONE_3 + PHONE_3.find('C_EXT').text
            rec.append(C_PHONE_3)    
            TaxInfo = Customer.find('TaxInfo')
            rec.append(TaxInfo.find('C_LCL_TX_ID').text)
            rec.append(TaxInfo.find('C_NAT_TX_ID').text)
            Account = Customer.find('Account')
            rec.append(Account.get('CA_ID'))
            rec.append(Account.get('CA_TAX_ST'))
            rec.append(Account.find('CA_B_ID').text)
            rec.append(Account.find('CA_NAME').text)
            results.append(rec)
    return results

In [25]:
new_schema = StructType([StructField('C_ID', StringType(), True),
                     StructField('C_TAX_ID', StringType(), True),
                     StructField('C_GNDR', StringType(), True),
                     StructField('C_TIER', StringType(), True),
                     StructField('C_DOB', StringType(), True),
                     StructField('C_L_NAME', StringType(), True),
                     StructField('C_F_NAME', StringType(), True),
                     StructField('C_M_NAME', StringType(), True),
                     StructField('C_ADLINE1', StringType(), True),
                     StructField('C_ADLINE2', StringType(), True),
                     StructField('C_ZIPCODE', StringType(), True),
                     StructField('C_CITY', StringType(), True),
                     StructField('C_STATE_PROV', StringType(), True),
                     StructField('C_CTRY', StringType(), True),
                     StructField('C_PRIM_EMAIL', StringType(), True),
                     StructField('C_ALT_EMAIL', StringType(), True),
                     StructField('C_PHONE_1', StringType(), True),
                     StructField('C_PHONE_2', StringType(), True),
                     StructField('C_PHONE_3', StringType(), True),
                     StructField('C_LCL_TX_ID', StringType(), True),
                     StructField('C_NAT_TX_ID', StringType(), True),
                     StructField('CA_ID', StringType(), True),
                     StructField('CA_TAX_ST', StringType(), True),
                     StructField('CA_B_ID', StringType(), True),
                     StructField('CA_NAME', StringType(), True)])

In [26]:
# get the data from xml and build a datafrmae 
NEW_df = parse_xml()
new_array = spark.sparkContext.parallelize(NEW_df)
NEW_df = spark.createDataFrame(NEW_df,new_schema)

In [27]:
# make sure that the data is retrieved successfully
NEW_df.select("C_ID", "C_F_NAME", "C_L_NAME", "C_CITY").show(5)

+----+--------+--------+----------------+
|C_ID|C_F_NAME|C_L_NAME|          C_CITY|
+----+--------+--------+----------------+
|   0|   Adara| Joannis|        Columbus|
|   1|  Jirina| Paperno|       Inglewood|
|   2|  Mariam| McBryan|        Berkeley|
|   3| Robinia|    Adey|            Hull|
|   4|    Lulu| Haubert|Rancho Cucamonga|
+----+--------+--------+----------------+
only showing top 5 rows



#### Building Customer Dimenstion when ActionType is NEW

In [28]:
# update the names of columsn from original name in xml to custome dimenstion names as documented
CustomerDim = NEW_df.withColumnRenamed("C_ID","CustomerID")           
CustomerDim = CustomerDim.withColumnRenamed("C_TAX_ID","TaxID")
CustomerDim = CustomerDim.withColumnRenamed("C_L_NAME","LastName")
CustomerDim = CustomerDim.withColumnRenamed("C_F_NAME","FirstName")
CustomerDim = CustomerDim.withColumnRenamed("C_M_NAME","MiddleInitial")
CustomerDim = CustomerDim.withColumnRenamed("C_TIER","Tier")
CustomerDim = CustomerDim.withColumnRenamed("C_DOB","DOB")
CustomerDim = CustomerDim.withColumnRenamed("C_PRIM_EMAIL","Email1")
CustomerDim = CustomerDim.withColumnRenamed("C_ALT_EMAIL","Email2")
CustomerDim = CustomerDim.withColumnRenamed("C_GNDR","Gender")

CustomerDim = CustomerDim.withColumnRenamed("C_ADLINE1","AddressLine1")
CustomerDim = CustomerDim.withColumnRenamed("C_ADLINE2","AddressLine2")
CustomerDim = CustomerDim.withColumnRenamed("C_ZIPCODE","PostalCode")
CustomerDim = CustomerDim.withColumnRenamed("C_CITY","City")
CustomerDim = CustomerDim.withColumnRenamed("C_STATE_PROV","State_Prov")
CustomerDim = CustomerDim.withColumnRenamed("C_CTRY","Country")

#Adding status column to be 'ACTV' for all new accounts
CustomerDim = CustomerDim.withColumn('Status', lit("ACTV"))

CustomerDim = CustomerDim.withColumnRenamed("C_PHONE_1","Phone1")
CustomerDim = CustomerDim.withColumnRenamed("C_PHONE_2","Phone2")
CustomerDim = CustomerDim.withColumnRenamed("C_PHONE_3","Phone3")

# check if teh data is successfully added to the dimension
CustomerDim.select("CustomerID", "LastName", "FirstName", "Gender").show(5)

+----------+--------+---------+------+
|CustomerID|LastName|FirstName|Gender|
+----------+--------+---------+------+
|         0| Joannis|    Adara|     F|
|         1| Paperno|   Jirina|     F|
|         2| McBryan|   Mariam|  null|
|         3|    Adey|  Robinia|     B|
|         4| Haubert|     Lulu|     m|
+----------+--------+---------+------+
only showing top 5 rows



In [29]:
#Add IsCurrent, EndDate, EffectiveDate and BatchID columns using the UDF created before 
CustomerDim = CustomerDim.withColumn("IsCurrent", Set_IsCurrent_Col_UDF())
CustomerDim = CustomerDim.withColumn("EndDate", Set_EndDate_Col_UDF())
CustomerDim = CustomerDim.withColumn("EffectiveDate", Set_EffectiveDate_Col_UDF())
CustomerDim = CustomerDim.withColumn("BatchID", Set_BatchID_Col_UDF())
CustomerDim.select("CustomerID", "LastName", "FirstName", "Gender","IsCurrent", "EndDate", "EffectiveDate", "BatchID").show(5)

+----------+--------+---------+------+---------+----------+-------------+-------+
|CustomerID|LastName|FirstName|Gender|IsCurrent|   EndDate|EffectiveDate|BatchID|
+----------+--------+---------+------+---------+----------+-------------+-------+
|         0| Joannis|    Adara|     F|     true|9999-12-31|   1950-01-01|      1|
|         1| Paperno|   Jirina|     F|     true|9999-12-31|   1950-01-01|      1|
|         2| McBryan|   Mariam|  null|     true|9999-12-31|   1950-01-01|      1|
|         3|    Adey|  Robinia|     B|     true|9999-12-31|   1950-01-01|      1|
|         4| Haubert|     Lulu|     m|     true|9999-12-31|   1950-01-01|      1|
+----------+--------+---------+------+---------+----------+-------------+-------+
only showing top 5 rows



In [30]:
# gender column transformation is to change any value rather than F or M to U
# so first we change m small to be M uppercase, aslo f small to F uppercase
# then Transfrom others to U
from pyspark.sql.functions import when   
CustomerDim = CustomerDim.withColumn('Gender', when(CustomerDim.Gender == 'f', 'F').otherwise(CustomerDim.Gender))
CustomerDim = CustomerDim.withColumn('Gender', when(CustomerDim.Gender == 'm', 'M').otherwise(CustomerDim.Gender))
CustomerDim = CustomerDim.withColumn('Gender', when(((CustomerDim.Gender != 'F') & (CustomerDim.Gender != 'M')) | (CustomerDim.Gender.isNull()), 'U').otherwise(CustomerDim.Gender))
CustomerDim.select("CustomerID", "LastName", "FirstName", "Gender").show(5)

+----------+--------+---------+------+
|CustomerID|LastName|FirstName|Gender|
+----------+--------+---------+------+
|         0| Joannis|    Adara|     F|
|         1| Paperno|   Jirina|     F|
|         2| McBryan|   Mariam|     U|
|         3|    Adey|  Robinia|     U|
|         4| Haubert|     Lulu|     M|
+----------+--------+---------+------+
only showing top 5 rows



In [31]:
# make a join with tax to get 4 new columns related to taxes
# which are NationalTaxRateDesc, NationalTaxRate , LocalTaxRateDesc, LocalTaxRate
# by making 2 diffeent joi n with the same table but with different consition 
# for national the used column is C_NAT_TX_ID
# FRO LOCAL the used column is C_LCL_TX_ID
CustomerDim = CustomerDim.join(taxrate, CustomerDim.C_NAT_TX_ID == taxrate.TX_ID,how='left')
CustomerDim = CustomerDim.drop('C_NAT_TX_ID','TX_ID')
CustomerDim = CustomerDim.withColumnRenamed("TX_NAME","NationalTaxRateDesc")
CustomerDim = CustomerDim.withColumnRenamed("TX_RATE","NationalTaxRate")

# chekc if it works!
CustomerDim.select("CustomerID", "LastName", "FirstName", "NationalTaxRateDesc", "NationalTaxRate").show(5)


+----------+--------+---------+--------------------+---------------+
|CustomerID|LastName|FirstName| NationalTaxRateDesc|NationalTaxRate|
+----------+--------+---------+--------------------+---------------+
|         0| Joannis|    Adara|Yukon Income Tax ...|         0.2336|
|         1| Paperno|   Jirina|Nunavut Income Ta...|          0.377|
|         2| McBryan|   Mariam|Iowa Income Tax f...|         0.0036|
|         3|    Adey|  Robinia|Missouri Income T...|          0.015|
|         4| Haubert|     Lulu|Manitoba Income T...|          0.394|
+----------+--------+---------+--------------------+---------------+
only showing top 5 rows



In [32]:
CustomerDim = CustomerDim.join(taxrate, CustomerDim.C_LCL_TX_ID == taxrate.TX_ID,how='left')
CustomerDim = CustomerDim.drop('C_LCL_TX_ID','TX_ID')
CustomerDim = CustomerDim.withColumnRenamed("TX_NAME","LocalTaxRateDesc")
CustomerDim = CustomerDim.withColumnRenamed("TX_RATE","LocalTaxRate")

In [33]:
# chekc if it works!
CustomerDim.select("CustomerID", "LastName", "FirstName", "NationalTaxRateDesc", "NationalTaxRate", "LocalTaxRateDesc", "LocalTaxRate").show(5)

+----------+--------+---------+--------------------+---------------+--------------------+------------+
|CustomerID|LastName|FirstName| NationalTaxRateDesc|NationalTaxRate|    LocalTaxRateDesc|LocalTaxRate|
+----------+--------+---------+--------------------+---------------+--------------------+------------+
|         0| Joannis|    Adara|Yukon Income Tax ...|         0.2336|California Tax fo...|      0.0432|
|         1| Paperno|   Jirina|Nunavut Income Ta...|          0.377|British Columbia ...|       0.357|
|         2| McBryan|   Mariam|Iowa Income Tax f...|         0.0036|New York Income T...|      0.0614|
|         3|    Adey|  Robinia|Missouri Income T...|          0.015|Wisconsin Income ...|      0.0675|
|         4| Haubert|     Lulu|Manitoba Income T...|          0.394|Ontario Income Ta...|      0.2216|
+----------+--------+---------+--------------------+---------------+--------------------+------------+
only showing top 5 rows



#### Reading XML - ADDACCT

In [34]:
# in this section we read from CustomerMgmt.xml the reset of data but this time with actiontype = ADDACCT
def parse_addacct_xml(): 
    results = []
    tree = ET.parse('Data/CustomerMgmt.xml')
    root = tree.getroot()
    for child in root.findall('{http://www.tpc.org/tpc-di}Action'):
        #print(child.get('ActionType'))
        if child.get('ActionType') == "ADDACCT":
            rec = []
            Customer = child.find('Customer')
            rec.append(Customer.get('C_ID'))
            Account = Customer.find('Account')
            rec.append(Account.get('CA_ID'))
            rec.append(Account.get('CA_TAX_ST'))
            rec.append(Account.find('CA_B_ID').text)
            rec.append(Account.find('CA_NAME').text)
            results.append(rec)
    return results

In [35]:
ADDACCT_schema = StructType([StructField('C_ID', StringType(), True),
                     StructField('CA_ID', StringType(), True),
                     StructField('CA_TAX_ST', StringType(), True),
                     StructField('CA_B_ID', StringType(), True),
                     StructField('CA_NAME', StringType(), True)])

In [36]:
ADDACCT_df = parse_addacct_xml()
ADDACCT_df = spark.sparkContext.parallelize(ADDACCT_df)
ADDACCT_df = spark.createDataFrame(ADDACCT_df,ADDACCT_schema)

#### Reading XML - CLOSEACCT

In [37]:
# in this section we read from CustomerMgmt.xml the reset of data but this time with actiontype = CLOSEACCT
def parse_CLOSEACCT_xml(): 
    results = []
    tree = ET.parse('Data/CustomerMgmt.xml')
    root = tree.getroot()
    for child in root.findall('{http://www.tpc.org/tpc-di}Action'):
        if child.get('ActionType') == "CLOSEACCT":
            rec = []
            Customer = child.find('Customer')
            rec.append(Customer.get('C_ID'))
            Account = Customer.find('Account')
            rec.append(Account.get('CA_ID'))
            results.append(rec)
    return results

In [38]:
CLOSEACCT_schema = StructType([StructField('C_ID', StringType(), True),
                     StructField('CA_ID', StringType(), True)])

In [39]:
CLOSEACCT_df = parse_CLOSEACCT_xml()
CLOSEACCT_df = spark.sparkContext.parallelize(CLOSEACCT_df)
CLOSEACCT_df = spark.createDataFrame(CLOSEACCT_df,CLOSEACCT_schema)

In [40]:
CLOSEACCT_df.select("C_ID", "CA_ID").show(5)

+----+-----+
|C_ID|CA_ID|
+----+-----+
|  27|   27|
|  41|   41|
|  55|   55|
|  46|   46|
|  60|   60|
+----+-----+
only showing top 5 rows



#### Reading XML - INACT

In [41]:
# in this section we read from CustomerMgmt.xml the reset of data but this time with actiontype = INACT
def parse_INACT_xml(): 
    results = []
    tree = ET.parse('Data/CustomerMgmt.xml')
    root = tree.getroot()
    for child in root.findall('{http://www.tpc.org/tpc-di}Action'):
        if child.get('ActionType') == "INACT":
            rec = []
            Customer = child.find('Customer')
            rec.append(Customer.get('C_ID'))
            results.append(rec)
    return results

In [42]:
INACT_schema = StructType([StructField('C_ID', StringType(), True)])

In [43]:
INACT_df = parse_INACT_xml()
INACT_df = spark.sparkContext.parallelize(INACT_df)
INACT_df = spark.createDataFrame(INACT_df,INACT_schema)

In [44]:
INACT_df.select("C_ID").show(5)

+----+
|C_ID|
+----+
|  48|
|  41|
|  34|
|  62|
|  55|
+----+
only showing top 5 rows



#### Building Account Dimenstion when ActionType is NEW

In [45]:
# start building dimension account but with action type = NEW 
# Select columns needed from custoemr dimension
DimAccount = CustomerDim.select("CA_ID","CA_B_ID","CustomerID","CA_TAX_ST","CA_NAME")

# set staus to Active as documnetded
DimAccount = DimAccount.withColumn('Status', lit("ACTV"))
# usign user defined functions to set multiple columsn s happened before in customer dimentsion
DimAccount = DimAccount.withColumn("IsCurrent", Set_IsCurrent_Col_UDF())
DimAccount = DimAccount.withColumn("EndDate", Set_EndDate_Col_UDF())
DimAccount = DimAccount.withColumn("EffectiveDate", Set_EffectiveDate_Col_UDF())
DimAccount = DimAccount.withColumn("BatchID", Set_BatchID_Col_UDF())

# change names to match the columns names in dimension
DimAccount = DimAccount.withColumnRenamed("CA_ID","AccountID")
DimAccount = DimAccount.withColumnRenamed("CA_B_ID","SK_BrokerID")
DimAccount = DimAccount.withColumnRenamed("CustomerID","SK_CustomerID")
DimAccount = DimAccount.withColumnRenamed("CA_TAX_ST","TaxStatus")
DimAccount = DimAccount.withColumnRenamed("CA_NAME","AccountDesc")

#check if it works!
DimAccount.select('AccountID','SK_BrokerID','SK_CustomerID','TaxStatus','AccountDesc').show(5)

+---------+-----------+-------------+---------+--------------------+
|AccountID|SK_BrokerID|SK_CustomerID|TaxStatus|         AccountDesc|
+---------+-----------+-------------+---------+--------------------+
|        0|      17713|            0|        1|CJlmMuFyibKOmKLHI...|
|        1|        615|            1|        2|BbxTgVGOlgyrYtVRj...|
|        2|       3927|            2|        1|IGzIDNTTRUDKwGaoV...|
|        3|       6256|            3|        1|ZHXwHtCcLZqdWhWOP...|
|        4|       3412|            4|        1|mzlYZlTIDmOGuKQHO...|
+---------+-----------+-------------+---------+--------------------+
only showing top 5 rows



#### Building Fact Cash Balances

In [46]:
# make a join with acconunt dimnsion as the documenation
cashtransaction = cashtransaction_txt.join(DimAccount,cashtransaction_txt.CT_CA_ID == DimAccount.AccountID)
# get distinc values to make sure that the join is done successulyy.
cashtransaction.select('AccountID').distinct().show(5)

+---------+
|AccountID|
+---------+
|     4937|
|     2294|
|     2069|
|     9009|
|     4032|
+---------+
only showing top 5 rows



In [47]:
# build the FACTCASHBALANCES by make a sum for all transaction for user in a day
# so we groub by the user and teh ransaction and sum the amount
FactCashBalances = cashtransaction.groupBy('Date','CT_CA_ID')\
                                    .agg({"CT_AMT":"sum"})\
                                    .withColumnRenamed("sum(CT_AMT)","Cash")\
                                    .withColumn("BatchID", Set_BatchID_Col_UDF())

In [48]:
FactCashBalances.count()  

555761

#### Running the Required Quries

In [49]:
# finaly we strt running the required qureis
# but in the beginig we make a join with date dimesion because all queries are dependent on it.
cashtransaction = cashtransaction.join(DimDate,FactCashBalances.Date == DimDate.DateValue,how='left')

In [50]:
# make sure the join is done succeesfulyy
cashtransaction.select('DateValue','CalendarYearID', 'CalendarMonthID', 'CalendarQtrID').show(5)

+----------+--------------+---------------+-------------+
| DateValue|CalendarYearID|CalendarMonthID|CalendarQtrID|
+----------+--------------+---------------+-------------+
|2012-10-09|          2012|         201210|        20124|
|2012-10-11|          2012|         201210|        20124|
|2012-11-05|          2012|         201211|        20124|
|2012-09-07|          2012|          20129|        20123|
|2012-09-08|          2012|          20129|        20123|
+----------+--------------+---------------+-------------+
only showing top 5 rows



In [51]:
# first query: Get the sum of transactions per customer for the first quarter of year 1950
# but unfortunatly there is no data in theis year so we get the data in 4th quarter in 2012
# just we need to replace 20124 by 19501

# we get all transaction in the specifed quarter the group by customer to sum them
query1 = cashtransaction.where('CalendarQtrID = 20124').groupBy('CT_CA_ID')\
                                    .agg({"CT_AMT":"sum"})\
                                    .withColumnRenamed("sum(CT_AMT)","Cash")

# make sure it works!
query1.show(5)

+--------+------------------+
|CT_CA_ID|              Cash|
+--------+------------------+
|     463|-789355.5455322266|
|     471| 775647.7464599609|
|    1088|-589189.6413574219|
|    1238|       -19597.6875|
|     243|-771182.8375244141|
+--------+------------------+
only showing top 5 rows



In [52]:
#query2 : Get max transaction amount per quarter

# we roup by the quarter and get max so the results would be the max transaction amoutn in each quarter
query2 = cashtransaction.groupBy('CalendarQtrID')\
                                    .agg({"CT_AMT":"max"})\
                                    .withColumnRenamed("max(CT_AMT)","Cash")

# make sure it works!
query2.show(5)


+-------------+---------+
|CalendarQtrID|     Cash|
+-------------+---------+
|        20134|998903.25|
|        20124| 999889.3|
|        20132| 999324.5|
|        20163| 998761.1|
|        20131| 999928.3|
+-------------+---------+
only showing top 5 rows



In [53]:
#query3: For a specific customer ID, get the total amounts of transactions per week.

# we choose customer of id =1 as examle for the query
# we group by weekID then sum to get all the transaction amount per week fro specific customer.
query3 = cashtransaction.where('SK_CustomerID ==1').groupBy('CalendarWeekID')\
                                    .agg({"CT_AMT":"sum"})\
                                    .withColumnRenamed("sum(CT_AMT)","Cash")

# make sure it works!
query3.show(5)

+--------------+-----------------+
|CalendarWeekID|             Cash|
+--------------+-----------------+
|         20135|597520.8928222656|
|        201252| -36.400146484375|
|        201415| 3438.14990234375|
|        201249|   -20448.0703125|
|         20134|532132.2292480469|
+--------------+-----------------+
only showing top 5 rows

