# "Customer Dimension"

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [48]:
spark = SparkSession.builder \
    .appName("Employee") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.jars", "/Drivers/SQL_Sever/jdbc/sqljdbc42.jar")\
    .enableHiveSupport() \
    .getOrCreate()

In [49]:
spark.sql("create namespace bronze;")

DataFrame[]

In [50]:
Tables= ["[Sales].[Customer]", "[Person].[StateProvince]", "[Person].[BusinessEntityAddress]", "[Person].[Address]", "[Person].[Person]", "[HumanResources].[Department]", "[HumanResources].[EmployeeDepartmentHistory]", "[HumanResources].[Employee]"]
print(len(Tables), " tables")

8  tables


# Read The Tables From SQL Server Database

In [51]:
dataFrames= {}
for table in Tables:
    query = f"select * from {table}"
    df =spark.read.format("jdbc")\
        .option("url", "jdbc:sqlserver://172.18.0.5:1433;databaseName=AdventureWorks2017")\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
        .option("dbtable", f"({query}) as temp")\
        .option("user","sa")\
        .option("password", "Mo*012105")\
        .load()
    dataFrames[table] = df
print(dataFrames.keys())

dict_keys(['[Sales].[Customer]', '[Person].[StateProvince]', '[Person].[BusinessEntityAddress]', '[Person].[Address]', '[Person].[Person]', '[HumanResources].[Department]', '[HumanResources].[EmployeeDepartmentHistory]', '[HumanResources].[Employee]'])


prepare Customer table for further use

In [52]:
customer = dataFrames['[Sales].[Customer]']\
    .select('CustomerID',
             'PersonID',
             'StoreID',
             'TerritoryID',
             'AccountNumber')\
    .repartition(4, "CustomerID")\
    .cache()
customer.createOrReplaceTempView("customer")
customer.count()

19820

Employee department history, and we will select the last position for each employee

In [53]:
depthist = dataFrames["[HumanResources].[EmployeeDepartmentHistory]"]
depthist.createOrReplaceTempView("depthist")

#select the last department for each employee
depthist = spark.sql("""
select * from depthist
where EndDate is null
""")
depthist=depthist.select("BusinessEntityID", "DepartmentID")\
    .repartition("BusinessEntityID", "DepartmentID")\
    .cache()
depthist.createOrReplaceTempView("depthist")
depthist.columns

['BusinessEntityID', 'DepartmentID']

Department DF

In [54]:
dept = dataFrames["[HumanResources].[Department]"]\
    .select('DepartmentID', 'Name', 'GroupName')\
    .repartition(4,"DepartmentID")\
    .cache()
dept.createOrReplaceTempView("dept")
dept.columns

['DepartmentID', 'Name', 'GroupName']

Customer is a person so i will extract the customers info from Person table

In [55]:
person= dataFrames['[Person].[Person]'].withColumn("EmployeeName", concat_ws(" ", "FirstName","LastName"))

person= person.select('BusinessEntityID',
     'EmployeeName',        
     'PersonType',
     'NameStyle',
     'Title',
     'Suffix',
     'EmailPromotion',
     'AdditionalContactInfo',
     'Demographics')\
    .repartition(4,"BusinessEntityID")\
    .cache()

person.createOrReplaceTempView("person")
person.columns

['BusinessEntityID',
 'EmployeeName',
 'PersonType',
 'NameStyle',
 'Title',
 'Suffix',
 'EmailPromotion',
 'AdditionalContactInfo',
 'Demographics']

Address info

In [56]:
address = dataFrames["[Person].[Address]"]\
    .select('AddressID',
         'AddressLine1',
         'AddressLine2',
         'City',
         'StateProvinceID',
         'PostalCode',
         'SpatialLocation')\
    .repartition(4,"AddressID")\
    .cache()
address.createOrReplaceTempView("address")
address.columns

['AddressID',
 'AddressLine1',
 'AddressLine2',
 'City',
 'StateProvinceID',
 'PostalCode',
 'SpatialLocation']

I will use BusinessEntityAddress as linker between person and address

In [57]:
entityAdd= dataFrames["[Person].[BusinessEntityAddress]"]\
    .select('BusinessEntityID', 'AddressID')\
    .repartition(4,'BusinessEntityID', 'AddressID')\
    .cache()
entityAdd.createOrReplaceTempView("entityAdd")
entityAdd.columns

['BusinessEntityID', 'AddressID']

In [58]:
state = dataFrames["[Person].[StateProvince]"]\
    .select('StateProvinceID',
     'StateProvinceCode',
     'CountryRegionCode',
     'IsOnlyStateProvinceFlag',
     expr('Name').alias("stateName"))\
    .repartition(4,'StateProvinceID')\
    .cache()
state.createOrReplaceTempView("state")
state.columns

['StateProvinceID',
 'StateProvinceCode',
 'CountryRegionCode',
 'IsOnlyStateProvinceFlag',
 'stateName']

In [59]:
spark.catalog.listTables()

[Table(name='address', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='customer', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='dept', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='depthist', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='entityAdd', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='person', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='state', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

# join the all dataFrames to get DimCustomer DF

In [60]:
DimCustomer = spark.sql("""

    select *
    from customer c inner join person p
    on c.PersonID = p.BusinessEntityID
    
    inner join entityadd ea
    on p.BusinessEntityID = ea.BusinessEntityID

    inner join address add
    on ea.AddressID = add.AddressID

    inner join state s
    on add.StateProvinceID = s.StateProvinceID
    
""")
DimCustomer.columns

['CustomerID',
 'PersonID',
 'StoreID',
 'TerritoryID',
 'AccountNumber',
 'BusinessEntityID',
 'EmployeeName',
 'PersonType',
 'NameStyle',
 'Title',
 'Suffix',
 'EmailPromotion',
 'AdditionalContactInfo',
 'Demographics',
 'BusinessEntityID',
 'AddressID',
 'AddressID',
 'AddressLine1',
 'AddressLine2',
 'City',
 'StateProvinceID',
 'PostalCode',
 'SpatialLocation',
 'StateProvinceID',
 'StateProvinceCode',
 'CountryRegionCode',
 'IsOnlyStateProvinceFlag',
 'stateName']

select needed columns

In [61]:
DimCustomer= DimCustomer.select(
             'CustomerID',
             'AccountNumber',
             expr('EmployeeName').alias("CustomerName"),
             'PersonType',
             'NameStyle',
             'Title',
             'Suffix',
             'EmailPromotion',
             'AdditionalContactInfo',
             'Demographics',
             'AddressLine1',
             'AddressLine2',
             'City',
             'PostalCode',
             'SpatialLocation',
             'StateProvinceCode',
             'CountryRegionCode',
             'IsOnlyStateProvinceFlag',
             'stateName'
                )
DimCustomer.count()
DimCustomer= DimCustomer.repartition(4,"CustomerID")
DimCustomer.createOrReplaceTempView("DimCustomer")

In [62]:
spark.sql("select * from DimCustomer").columns

['CustomerID',
 'AccountNumber',
 'CustomerName',
 'PersonType',
 'NameStyle',
 'Title',
 'Suffix',
 'EmailPromotion',
 'AdditionalContactInfo',
 'Demographics',
 'AddressLine1',
 'AddressLine2',
 'City',
 'PostalCode',
 'SpatialLocation',
 'StateProvinceCode',
 'CountryRegionCode',
 'IsOnlyStateProvinceFlag',
 'stateName']

In [63]:
DimCustomer.write.mode("overwrite").format("hive").saveAsTable("bronze.DimCustomer")

In [64]:
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|   bronze|
|  default|
|     gold|
|    sales|
|   silver|
+---------+



In [65]:
spark.sql("use bronze").show()

++
||
++
++



In [66]:
spark.sql("show tables;").show()


+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|   bronze|dimcustomer|      false|
|         |    address|       true|
|         |   customer|       true|
|         |       dept|       true|
|         |   depthist|       true|
|         |dimcustomer|       true|
|         |  entityadd|       true|
|         |     person|       true|
|         |      state|       true|
+---------+-----------+-----------+



In [67]:
#spark.sql("drop table dimcustomer;").show()
