In [0]:
from pyspark.sql.functions import *

#Load Customer Data

In [0]:
dfCustomer = spark.read.table("dev_bronze.sales.customer")
dfGeography = spark.read.table("dev_bronze.sales.geography")

In [0]:
dfJoined = dfCustomer.join(dfGeography, dfCustomer.GeographyKey == dfGeography.GeographyKey,"inner")

dfJoined = dfJoined.select("CustomerKey", 
  concat( col("FirstName"), lit(" "),  col("LastName") ).alias("FullName"), 
  "Gender","MaritalStatus","YearlyIncome","TotalChildren","EnglishEducation","EnglishOccupation",
  "City","StateProvinceName","EnglishCountryRegionName")

dfJoined = dfJoined.withColumnRenamed("EnglishEducation","Education")
dfJoined = dfJoined.withColumnRenamed("EnglishOccupation","Occupation")

dfJoined = dfJoined.withColumnRenamed("StateProvinceName","StateProvince")
dfJoined = dfJoined.withColumnRenamed("EnglishCountryRegionName","Country")

dfJoined.write.insertInto("dev_gold.sales.dimcustomer")

#Load Sales Data

In [0]:
dfSales = spark.read.table("dev_bronze.sales.sales")
dfSales = dfSales.select("ProductKey","CustomerKey","OrderDateKey","OrderQuantity","SalesAmount","DiscountAmount")

dfSales.write.insertInto("dev_gold.sales.factsales")

#Load Date Data

In [0]:
dfDate = spark.read.table("dev_bronze.sales.date")
dfDate = dfDate.select("DateKey","FullDateAlternateKey","EnglishMonthName","MonthNumberOfYear","CalendarQuarter","CalendarYear")

dfDate = dfDate.withColumnRenamed("FullDateAlternateKey","TheDate")
dfDate =dfDate.withColumnRenamed("EnglishMonthName","MonthName")

dfDate = dfDate.withColumn("CalendarQuarter", concat(lit("Q"), col("CalendarQuarter").cast("STRING")))


dfDate.write.insertInto("dev_gold.sales.dimdate")

#Load Product Data

In [0]:
dfProduct = spark.read.table("dev_bronze.sales.product")
dfProductSubcategory = spark.read.table("dev_bronze.sales.productsubcategory")
dfProductCategory = spark.read.table("dev_bronze.sales.productcategory")

In [0]:
dfJoined = dfProduct.join(dfProductSubcategory, dfProduct.ProductSubcategoryKey == dfProductSubcategory.ProductSubcategoryKey,"inner")

dfJoined = dfJoined.join(dfProductCategory, dfJoined.ProductCategoryKey == dfProductCategory.ProductCategoryKey,"inner")

dfJoined = dfJoined.select("ProductKey", "EnglishProductName","EnglishProductSubcategoryName","EnglishProductCategoryName","Color","ListPrice")

dfJoined = dfJoined.withColumnRenamed("EnglishProductName","ProductName")
dfJoined = dfJoined.withColumnRenamed("EnglishProductSubcategoryName","Subcategory")
dfJoined = dfJoined.withColumnRenamed("EnglishProductCategoryName","Category")

dfJoined = dfJoined.na.fill(0,subset=["ListPrice"])

dfJoined.write.insertInto("dev_gold.sales.dimproduct")

In [0]:
%sql
SELECT * FROM dev_gold.sales.factsales LIMIT 5;
SELECT * FROM dev_gold.sales.dimcustomer LIMIT 5;
SELECT * FROM dev_gold.sales.dimdate LIMIT 5;
SELECT * FROM dev_gold.sales.dimproduct LIMIT 5;
