In [93]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, current_timestamp
from pyspark.sql.context import SQLContext

## Load preferences 

In [94]:
sparkSession = SparkSession.builder.master("local")\
                              .appName("project2")\
                              .getOrCreate()

In [95]:
def get_schema(column_names, required_columns_type):
    """
    To get schema of the file.
    """
    struct_field_list = [StructField(name, column_type, True)
                         for name, column_type in zip(column_names, required_columns_type)]
    return StructType(struct_field_list)


def load_data(file_path, schema, delimiter):
    return sparkSession.read.format("csv").option("delimiter", delimiter)\
                           .schema(schema)\
                           .load(file_path)

#### Load HR.CSV

In [38]:
column_names = ["EmployeeID", "ManagerID", "EmployeeFirstName", "EmployeeLastName",
               "EmployeeMI", "EmployeeJobCode", "EmployeeBranch", "EmployeeOffice",
               "EmployeePhone"]
required_columns_type = [StringType(), StringType(), StringType(), StringType(),
                        StringType(), StringType(), StringType(), StringType(),
                        StringType(),BooleanType(), IntegerType(), DateType(), DateType()]
file_path= "Dataset/Batch1/HR.csv"
schema = get_schema(column_names, required_columns_type)
hr_df = load_data(file_path, schema, ",")
hr_df.show(5)

+----------+---------+-----------------+----------------+----------+---------------+--------------------+--------------+--------------+
|EmployeeID|ManagerID|EmployeeFirstName|EmployeeLastName|EmployeeMI|EmployeeJobCode|      EmployeeBranch|EmployeeOffice| EmployeePhone|
+----------+---------+-----------------+----------------+----------+---------------+--------------------+--------------+--------------+
|         0|      702|            Ozkan|         Douglas|      null|            647|EGZKSobTeknHCbLuH...|    OFFICE7152|(726) 088-3331|
|         1|     1377|             Suer|         Candice|      null|            314|OfOBVvpzNvHCebxyu...|    OFFICE8586|(344) 999-2652|
|         2|      819|        Somisetty|            Jami|         P|            534|rAHWYkktOXAyPAYHl...|          null|(984) 538-5366|
|         3|      824|          Mazurek|       Rosalinda|         J|            364|TJQqsUQQGqWG QleL...|    OFFICE8487|(860) 037-6897|
|         4|     4345|        Aronovich|        

#### Load Date.txt

In [29]:
# Load Date
column_names = ["SK_DateID", "DateValue", "DateDesc", "CalendarYearID",
               "CalendarYearDesc", "CalendarQtrID", "CalendarQtrDesc", "CalendarMonthID",
               "CalendarMonthDesc", "CalendarWeekID", "CalendarWeekDesc", "DayOfWeekNum",
               "DayOfWeekDesc", "FiscalYearID", "FiscalYearDesc", "FiscalQtrID", 
               "FiscalQtrDesc", "HolidayFlag"]
required_columns_type = [IntegerType(), DateType(), StringType(), IntegerType(), StringType(),
                        IntegerType(), StringType(), IntegerType(), StringType(), IntegerType(),
                        StringType(), IntegerType(), StringType(), IntegerType(), 
                        StringType(), IntegerType(), StringType(), BooleanType()]

file_path= "Dataset/Batch1/Date.txt"
schema = get_schema(column_names, required_columns_type)
date_df = load_data(file_path, schema, "|")
date_df.show(5)

+---------+----------+---------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
|SK_DateID| DateValue|       DateDesc|CalendarYearID|CalendarYearDesc|CalendarQtrID|CalendarQtrDesc|CalendarMonthID|CalendarMonthDesc|CalendarWeekID|CalendarWeekDesc|DayOfWeekNum|DayOfWeekDesc|FiscalYearID|FiscalYearDesc|FiscalQtrID|FiscalQtrDesc|HolidayFlag|
+---------+----------+---------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
| 19500101|1950-01-01|January 1, 1950|          1950|            1950|        19501|        1950 Q1|          19501|     1950 January|         19501|         1950-W1|           7|       Sunday|        1950|          1950

#### Load Time.txt

In [31]:
# Load Time
column_names = ["SK_TimeID", "TimeValue", "HourID", "HourDesc",
               "MinuteID", "MinuteDesc", "SecondID", "SecondDesc",
               "MarketHoursFlag", "OfficeHoursFlag"]
required_columns_type = [IntegerType(),StringType(), IntegerType(),
                        StringType(), IntegerType(), StringType(), IntegerType(), 
                        StringType(), BooleanType(), BooleanType()]
file_path= "Dataset/Batch1/Time.txt"
schema = get_schema(column_names, required_columns_type)
time_df = load_data(file_path, schema, "|")
time_df.show(5)

+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
|SK_TimeID|TimeValue|HourID|HourDesc|MinuteID|MinuteDesc|SecondID|SecondDesc|MarketHoursFlag|OfficeHoursFlag|
+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
|        0| 00:00:00|     0|      00|       0|     00:00|       0|  00:00:00|          false|          false|
|        1| 00:00:01|     0|      00|       0|     00:00|       1|  00:00:01|          false|          false|
|        2| 00:00:02|     0|      00|       0|     00:00|       2|  00:00:02|          false|          false|
|        3| 00:00:03|     0|      00|       0|     00:00|       3|  00:00:03|          false|          false|
|        4| 00:00:04|     0|      00|       0|     00:00|       4|  00:00:04|          false|          false|
+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
only showi

#### Load CashTransaction.txt

In [8]:
# Load CashTransaction
column_names = ["SK_TimeID", "TimeValue", "HourID", "HourDesc",
               "MinuteID", "MinuteDesc", "SecondID", "SecondDesc",
               "MarketHoursFlag", "OfficeHoursFlag"]
file_path= "Dataset/Batch1/Time.txt"
schema = get_schema(column_names)
date_df = load_data(file_path, schema, "|")
date_df.show(5)

+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
|SK_TimeID|TimeValue|HourID|HourDesc|MinuteID|MinuteDesc|SecondID|SecondDesc|MarketHoursFlag|OfficeHoursFlag|
+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
|   000000| 00:00:00|    00|      00|      00|     00:00|      00|  00:00:00|          false|          false|
|   000001| 00:00:01|    00|      00|      00|     00:00|      01|  00:00:01|          false|          false|
|   000002| 00:00:02|    00|      00|      00|     00:00|      02|  00:00:02|          false|          false|
|   000003| 00:00:03|    00|      00|      00|     00:00|      03|  00:00:03|          false|          false|
|   000004| 00:00:04|    00|      00|      00|     00:00|      04|  00:00:04|          false|          false|
+---------+---------+------+--------+--------+----------+--------+----------+---------------+---------------+
only showi

### Load StatusType.txt

In [40]:
column_names= ["ST_ID", "ST_NAME"]
required_columns_type = [StringType(), StringType()]
file_path= "Dataset/Batch1/StatusType.txt"
schema = get_schema(column_names, required_columns_type)
status_df = load_data(file_path, schema, "|")
date_df.show(5)

+-----+---------+
|ST_ID|  ST_NAME|
+-----+---------+
| ACTV|   Active|
| CMPT|Completed|
| CNCL| Canceled|
| PNDG|  Pending|
| SBMT|Submitted|
+-----+---------+
only showing top 5 rows



## Create dimensions

### 1 - Create DimDate

**The source file of this dimention is: Date.txt.**
    
    -- We have already load Date.txt file as dataframe with required types.

In [18]:
DimDate = date_df

### 2 - Create DimTime

**The source file of this dimention is: Date.txt.**
    
    -- We have already load time.txt file as dataframe with required types.

In [32]:
DimTime = time_df

### 3 - Create DimBroker

**The source file of this dimention is: HR.txt.**
    
    -- We have already load HR.txt file as dataframe with required types. But we need to filter employees with code = 314

In [37]:
DimBroker = hr_df.filter(hr_df.EmployeeJobCode == 314)
DimBroker.show(5)

+----------+---------+-----------------+----------------+----------+---------------+--------------------+--------------+--------------+
|EmployeeID|ManagerID|EmployeeFirstName|EmployeeLastName|EmployeeMI|EmployeeJobCode|      EmployeeBranch|EmployeeOffice| EmployeePhone|
+----------+---------+-----------------+----------------+----------+---------------+--------------------+--------------+--------------+
|         1|     1377|             Suer|         Candice|      null|            314|OfOBVvpzNvHCebxyu...|    OFFICE8586|(344) 999-2652|
|         4|     4345|        Aronovich|        Delphine|         M|            314|IEMJHuQgCPDHCwwJk...|    OFFICE9420|(604) 387-9350|
|         8|     2146|           Hansen|        Montreal|         T|            314|sGIpORbLsRjTdhqBN...|    OFFICE6343|(991) 491-4907|
|        11|     2259|       Charchanko|          Sheela|      null|            314|Cw QJMHPgpozCKsFZ...|    OFFICE7705|(977) 726-0106|
|        14|     3663|            Knorp|        

### 4 - Create DimStatusType

In [41]:
DimStatusType = date_df

### 5 - Create DimAccount

In [70]:
import xml.etree.ElementTree as ET
from xml.dom import minidom
import numpy as np
import pandas as pd
from datetime import datetime

**NOTE**

    -- we use 'minidom' beacuse ET can't see tag <TPCDI:Action>.
    

In [62]:
xmldoc = minidom.parse('Dataset/CustomerMgmt.xml')
action_tag_for_customers = xmldoc.getElementsByTagName('TPCDI:Action')

# Get actions types.
action_type_for_customers = [item.attributes["ActionType"].value 
                             for item in action_tag_for_customers]
print("The number of actions in customer file is {}.".format(len(action_type_for_customers)))

The number of actions in customer file is 50000.


In [109]:
columns_name_for_dim_account = ["SK_AccountID", "AccountID", "SK_BrokerID", "SK_CustomerID", "Status", 
                "AccountDesc", "TaxStatus", "IsCurrent", "BatchID", "EffectiveDate", 
                "EndDate"]

In [None]:
# We use if condition to handle the actiontype cases 
# 1 - ["NEW", "ADDACCT"] all data exist
# 2 - UPDACCT only updated part exist.
# 3 - Other: the account is closed or deactvated.

accounts = ET.parse('Dataset/CustomerMgmt.xml').iter("Customer")    

Dim_account_list= []
sk = 0
for i, x in enumerate(accounts, 0):
    
    if action_type_for_customers[i] in ["NEW", "ADDACCT"]:
        # Here: We extract all data we need from xml 
        C_ID = x.attrib["C_ID"]
        CA_ID = x.find("Account").attrib["CA_ID"]
        CA_TAX_ST = x.find("Account").attrib["CA_TAX_ST"]
        CA_B_ID = x.find("Account/CA_B_ID").text
        CA_NAME = x.find("Account/CA_NAME").text
        Dim_account_list.append((sk, CA_ID, CA_B_ID, C_ID, "Active", CA_NAME, CA_TAX_ST, True, 
               1, datetime.now(), "9999-12-31"))
        
    elif action_type_for_customers[i] == "UPDACCT":
        C_ID = x.attrib["C_ID"]
        CA_ID = x.find("Account").attrib["CA_ID"]
        CA_TAX_ST = x.find("Account").attrib["CA_TAX_ST"] if "CA_TAX_ST" in x.find("Account").attrib else None
        
        # check if part is exist if not set it to None.
        CA_B_ID = x.find("Account/CA_B_ID").text if x.find("Account/CA_B_ID") != None else None
        CA_NAME = x.find("Account/CA_NAME").text if x.find("Account/CA_NAME") != None else None
        new_values = (sk, CA_ID, CA_B_ID, C_ID, "Active", CA_NAME, CA_TAX_ST, True, 
               1, datetime.now(), "9999-12-31")
        
        # Get the last record with selected CA_ID.
        last_account = DimAccount_new_addacc.filter(DimAccount_new_addacc.AccountID == CA_ID).collect()[-1]  
        
        # Get the upaded value if exist if not set it to old value 
        updated_account = tuple([update if update != None else last 
                                     for update, last in zip(new_values, last_account)])  
        
        # Append it 
        Dim_account_list.append(updated_account)
    elif action_type_for_customers[i] in ["CLOSEACCT", "INACT"]:
        # The account is colosed or inactive
        last_account = list(DimAccount_new_addacc.filter(DimAccount_new_addacc.AccountID == CA_ID).collect()[-1]) 
        last_account[5] = "closed"
        Dim_account_list.append(tuple(last_account))
    sk+=1

DimAccount = sparkSession.createDataFrame(Dim_account_list, columns_name)
DimAccount.show(5)

In [None]:
# Let's save this dim as it takes a lot of time to process.


In [None]:
customer_column_name= ["SK_CustomerID", "CustomerID", "TaxID", "Status", "LastName", "FirstName",
                      "MiddleInitial", "Gender", "Tier", "DOB", "AddressLine1", "AddressLine2",
                      "PostalCode", "City", "StateProv", "Country", "Phone1", "Phone2", "Phone3",
                      "Email1", "Email2", "NationalTaxRateDesc", "NationalTaxRate", "LocalTaxRateDesc",
                      "LocalTaxRate", "AgencyID", "CreditRating", "NetWorth", "MarketingNameplate", 
                      "IsCurrent", "BatchID", "EffectiveDate", "EndDate"]

In [None]:
accounts = ET.parse('Dataset/CustomerMgmt.xml').iter("Customer")    

Dim_account_list= []
sk = 0
for i, x in enumerate(accounts, 0):
    
    if action_type_for_customers[i] == "NEW":
        # Here: We extract all data we need from xml 
        C_ID = x.attrib["C_ID"]
        C_TAX_ID = x.attrib["C_TAX_ID"]
        C_GNDR = x.attrib["C_GNDR"]
        Tier = x.attrib["C_GNDR"]
        LastName = x.find("Name/C_F_NAME").text
        FirstName = x.find("Name/C_F_NAME").text
        MiddleInitial = x.find("Name/MiddleInitial").text if x.find("Name/MiddleInitial") else None
        CA_B_ID = x.find("Name/C_F_NAME").text
        CA_NAME = x.find("Account/CA_NAME").text
        Dim_account_list.append((sk, CA_ID, CA_B_ID, C_ID, "Active", CA_NAME, CA_TAX_ST, True, 
               1, datetime.now(), "9999-12-31"))
        
    elif action_type_for_customers[i] == "UPDACCT":
        C_ID = x.attrib["C_ID"]
        CA_ID = x.find("Account").attrib["CA_ID"]
        CA_TAX_ST = x.find("Account").attrib["CA_TAX_ST"] if "CA_TAX_ST" in x.find("Account").attrib else None
        
        # check if part is exist if not set it to None.
        CA_B_ID = x.find("Account/CA_B_ID").text if x.find("Account/CA_B_ID") != None else None
        CA_NAME = x.find("Account/CA_NAME").text if x.find("Account/CA_NAME") != None else None
        new_values = (sk, CA_ID, CA_B_ID, C_ID, "Active", CA_NAME, CA_TAX_ST, True, 
               1, datetime.now(), "9999-12-31")

In [117]:
# Let's handle update action
accounts = ET.parse('Dataset/CustomerMgmt.xml').iter("Customer")    
for i, x in enumerate(accounts, 0):
    if action_type_for_customers[i] == "UPDACCT":
        C_ID = x.attrib["C_ID"]
        CA_ID = x.find("Account").attrib["CA_ID"]
        CA_TAX_ST = x.find("Account").attrib["CA_TAX_ST"] if "CA_TAX_ST" in x.find("Account").attrib else "None"
        CA_B_ID = x.find("Account/CA_B_ID").text if x.find("Account/CA_B_ID") != None else None
        CA_NAME = x.find("Account/CA_NAME").text if x.find("Account/CA_NAME") != None else None
        updated_account_values = (sk, CA_ID, CA_B_ID, C_ID, "Active", CA_NAME, CA_TAX_ST, True, 
               1, datetime.now(), "9999-12-31")
        
        
        matched = DimAccount_new_addacc.filter(DimAccount_new_addacc.AccountID == CA_ID).collect()[-1]  
        full_updated_values = tuple([update if update != "None" else old 
                                     for update, old in zip(updated_account_values, matched)])  
        try:
            
            new_record = sparkSession.createDataFrame([full_updated_values,], columns_name)
        except:
            print(full_updated_values)
        DimAccount_new_addacc = DimAccount_new_addacc.union(new_record)
        sk +=1
        
DimAccount_new_addacc.show(10)

(50092, '64', '34955', '64', 'Active', None, '1', True, 1, datetime.datetime(2020, 4, 4, 17, 30, 51, 267878), '9999-12-31')
(50149, '293', '45669', '11', 'Active', None, '1', True, 1, datetime.datetime(2020, 4, 4, 17, 34, 50, 956259), '9999-12-31')
(50152, '286', '33966', '110', 'Active', None, '0', True, 1, datetime.datetime(2020, 4, 4, 17, 35, 10, 359756), '9999-12-31')
(50154, '220', '15766', '114', 'Active', None, '1', True, 1, datetime.datetime(2020, 4, 4, 17, 35, 24, 611794), '9999-12-31')
(50166, '197', '38615', '123', 'Active', None, '1', True, 1, datetime.datetime(2020, 4, 4, 17, 36, 58, 866095), '9999-12-31')
(50175, '133', '28543', '31', 'Active', None, '0', True, 1, datetime.datetime(2020, 4, 4, 17, 38, 18, 340420), '9999-12-31')
(50191, '70', '13655', '70', 'Active', None, '1', True, 1, datetime.datetime(2020, 4, 4, 17, 41, 2, 525894), '9999-12-31')
(50205, '68', '46951', '68', 'Active', None, '2', True, 1, datetime.datetime(2020, 4, 4, 17, 43, 49, 18488), '9999-12-31')
(5

KeyboardInterrupt: 

In [83]:
accounts

<_elementtree._element_iterator at 0x7f8fc86a2870>

## #################################################

## الشغل الي تحت عك 

### Load status table

#### Let's join Account table with status table.

In [7]:
account_with_status = account_df.join(
                        broadcast(status_df), 
                        account_df.CA_ST_ID == status_df.ST_ID,   
                        'inner'
                  )



In [8]:
account_with_status.show(5)

+--------+-------+-----+--------+--------+--------------------+---------+--------+-----+--------+
|CDC_FLAG|CDC_DSN|CA_ID|CA_B_ID |CA_C_ID |             CA_NAME|CA_TAX_ST|CA_ST_ID|ST_ID| ST_NAME|
+--------+-------+-----+--------+--------+--------------------+---------+--------+-----+--------+
|       I|  43490|30470|   16206|   15280|XkRcJWPVFFSGAtTGo...|        1|    ACTV| ACTV|  Active|
|       U|  43491|13857|   35351|    4996|kXUQTTuZHQsJsIDcB...|        1|    ACTV| ACTV|  Active|
|       U|  43492|26685|   23304|    2762|ruXPPxRMDLjswZZHv...|        1|    INAC| INAC|Inactive|
|       I|  43493|30471|   43026|   15281|arQHNWBBCOGMxvWqT...|        2|    ACTV| ACTV|  Active|
|       I|  43494|30472|    5711|   15282|DuQgzgldMMnEnh Fh...|        1|    ACTV| ACTV|  Active|
+--------+-------+-----+--------+--------+--------------------+---------+--------+-----+--------+
only showing top 5 rows



### Load customer.xml file 

In [9]:
import xml.etree.ElementTree as ET
from xml.dom import minidom
import numpy as np
import pandas as pd

In [52]:
# Get a generator of dict containing account data 
accounts_dict = ET.parse('Dataset/CustomerMgmt.xml').iter("@ActionType")    
c_ids = []
for x in accounts_dict:
    print(x)
    if x.find("Account") != None:
        c_ids.append(x.attrib["C_ID"])

In [67]:
# Convert to Dataframe 
customer_pd = pd.DataFrame(customer_data_array, 
                 columns=["C_ID", "ACTION_TYPE", "CA_ID"], index=None)

In [68]:
mySchema = StructType([ StructField("C_ID", StringType(), True)\

                       ,StructField("ACTION_TYPE", StringType(), True)\

                       ,StructField("CA_ID", StringType(), True)])
customer_df = sparkSession.createDataFrame(customer_pd, schema = mySchema)

customer_df.show(5)

+----+-----------+-----+
|C_ID|ACTION_TYPE|CA_ID|
+----+-----------+-----+
|   0|        NEW|    0|
|   1|        NEW|    1|
|   2|        NEW|    2|
|   3|        NEW|    3|
|   4|        NEW|    4|
+----+-----------+-----+
only showing top 5 rows



### Join account_with_status with customer_df

In [69]:
dim_account_df = account_with_status.join(
                        customer_df, 
                        account_with_status.CA_ID == customer_df.CA_ID,   
                        'inner'
                  )

In [70]:
dim_account_df.show(5)

+--------+-------+-----+--------+--------+--------------------+---------+--------+-----+--------+----+-----------+-----+
|CDC_FLAG|CDC_DSN|CA_ID|CA_B_ID |CA_C_ID |             CA_NAME|CA_TAX_ST|CA_ST_ID|ST_ID| ST_NAME|C_ID|ACTION_TYPE|CA_ID|
+--------+-------+-----+--------+--------+--------------------+---------+--------+-----+--------+----+-----------+-----+
|       U|  43579|  751|   36480|     436|SBkXmBJLgAbOmSROj...|        1|    INAC| INAC|Inactive| 436|        NEW|  751|
|       U|  43574| 1143|    1474|     618|INkSQXOCuakseRkSa...|        2|    ACTV| ACTV|  Active| 618|        NEW| 1143|
|       U|  43561| 3568|   30347|    1761|LgEiiaOJQMRJNcDMm...|        1|    ACTV| ACTV|  Active|1761|    ADDACCT| 3568|
|       U|  43561| 3568|   30347|    1761|LgEiiaOJQMRJNcDMm...|        1|    ACTV| ACTV|  Active|1761|    UPDACCT| 3568|
|       U|  43553| 4128|   28792|    1607|FdnzvlBxEzFnsRpVd...|        1|    INAC| INAC|Inactive|1607|    ADDACCT| 4128|
+--------+-------+-----+--------

In [82]:
dim_account_df.columns

['CDC_FLAG',
 'CDC_DSN',
 'CA_B_ID ',
 'CA_C_ID ',
 'CA_NAME',
 'CA_TAX_ST',
 'ST_NAME',
 'C_ID',
 'ACTION_TYPE']

In [None]:
BinaryType – Binary data.
BooleanType – Boolean values.
ByteType – A byte value.
DateType – A datetime value.
DoubleType – A floating-point double value.
IntegerType – An integer value.
LongType – A long integer value.
NullType – A null value.


