In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Demo').getOrCreate()

In [5]:
spark

In [47]:
fact = spark.read.csv('fact_internet_sales.csv', header = True, inferSchema = True)
customer = spark.read.csv('dim_customer.csv', header = True, inferSchema = True)
currency = spark.read.csv('dim_currency.csv', header = True, inferSchema = True)
date = spark.read.csv('dim_date.csv', header = True, inferSchema = True)
product = spark.read.csv('dim_product.csv', header = True, inferSchema = True)


In [51]:
customer.show(1)

+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|CustomerKey|FirstName|MiddleName|LastName|NameStyle| BirthDate|MaritalStatus|Suffix|Gender|        EmailAddress|YearlyIncome|TotalChildren|   AddressLine1|AddressLine2|              Phone|
+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|      11000|      Jon|         V|    Yang|    false|1971-10-06|            M|  NULL|     M|jon24@adventure-w...|     90000.0|            2|3761 N. 14th St|        NULL|1 (11) 500 555-0162|
+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
only showing top 1 row



In [52]:
currency.show(1)

+-----------+--------------------+------------+
|CurrencyKey|CurrencyAlternateKey|CurrencyName|
+-----------+--------------------+------------+
|          1|                 AFA|     Afghani|
+-----------+--------------------+------------+
only showing top 1 row



In [53]:
product.show(1)

+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
|ProductKey|ProductAlternateKey|EnglishProductName|SpanishProductName|Color|ListPrice|Size|Weight|DaysToManufacture| Status|
+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
|         1|            AR-5381|   Adjustable Race|              NULL|   NA|     NULL|NULL|  NULL|                0|Current|
+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
only showing top 1 row



In [54]:
date.show(1)

+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+
| DateKey|FullDateAlternateKey|DayNumberOfWeek|EnglishDayNameOfWeek|SpanishDayNameOfWeek|DayNumberOfMonth|DayNumberOfYear|WeekNumberOfYear|EnglishMonthName|SpanishMonthName|MonthNumberOfYear|CalendarQuarter|CalendarYear|CalendarSemester|FiscalQuarter|FiscalYear|FiscalSemester|
+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+
|20050101|          2005-01-01|              7|            Saturday|              Sábado|               1|              1|               1|         January|          

In [48]:
fact = fact.withColumnRenamed('OrderDateKey','DateKey')

In [49]:
FactInternetSales = (
fact
.join(customer,on='CustomerKey',how = 'inner')
.join(product,on='ProductKey',how = 'inner')
.join(date,on='DateKey',how = 'inner')
.join(currency,on='CurrencyKey',how = 'inner')
)

In [50]:
from pyspark.sql import functions as fn

In [69]:
Filtered = (
FactInternetSales
.filter(FactInternetSales['color'] == 'Silver')
.filter(FactInternetSales['EnglishDayNameOfWeek'] == 'Sunday')
.filter(FactInternetSales['Size'].isNotNull())
.filter(FactInternetSales['Weight'] > 10)
.filter(FactInternetSales['YearlyIncome']>100000.0)
.filter(FactInternetSales['TotalChildren']>1)
)

In [77]:
Filtered.show(10)

+-----------+--------+----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------+----------+---------+---------+----------+-------------+------+------+--------------------+------------+-------------+--------------------+------------+-------------------+-------------------+--------------------+--------------------+------+---------+----+------+-----------------+-------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+--------------------+--------------------+
|CurrencyKey| DateKey|ProductKey|CustomerKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|FirstName|MiddleNam

In [85]:
Aggregations = (
Filtered
.groupBy('CustomerKey','FirstName')
.agg(fn.sum('TaxAmt'),fn.mean('SalesAmount'),fn.mean('TotalProductCost'))
.withColumnRenamed('sum(TaxAmt)','TotalTaxAmt')
.withColumnRenamed('avg(SalesAmount)','AverageSalesAmount')
.withColumnRenamed('avg(TotalProductCost)','AverageTotalProductCost')
)

In [86]:
Aggregations.show(10)

+-----------+---------+-----------+------------------+-----------------------+
|CustomerKey|FirstName|TotalTaxAmt|AverageSalesAmount|AverageTotalProductCost|
+-----------+---------+-----------+------------------+-----------------------+
|      12637|  Leonard|   185.5992|           2319.99|              1265.6195|
|      18953|      Mya|    61.5592|            769.49|               419.7784|
|      15194|     Erik|   185.5992|           2319.99|              1265.6195|
|      12364|     Erin|   185.5992|           2319.99|              1265.6195|
|      15100|   Brenda|   185.5992|           2319.99|              1265.6195|
|      13658|    Wyatt|   185.5992|           2319.99|              1265.6195|
|      13278|    Jorge|   185.5992|           2319.99|              1265.6195|
|      19057|   Joanna|   185.5992|           2319.99|              1265.6195|
|      11249|    Cindy|   271.9992|           3399.99|              1912.1544|
|      13263|     Kate|   185.5992|           2319.9

In [87]:
Sorted = (
    Aggregations
    .orderBy(fn.col('FirstName'))
)

In [88]:
Sorted.show(10)

+-----------+---------+------------------+------------------+-----------------------+
|CustomerKey|FirstName|       TotalTaxAmt|AverageSalesAmount|AverageTotalProductCost|
+-----------+---------+------------------+------------------+-----------------------+
|      13581|   Albert|          185.5992|           2319.99|              1265.6195|
|      26107|   Alexis|          271.9992|           3399.99|              1912.1544|
|      11244|   Alexis|          185.5992|           2319.99|              1265.6195|
|      16623|   Andres|          185.5992|           2319.99|              1265.6195|
|      18131|      Ann|          185.5992|           2319.99|              1265.6195|
|      11240|     Anne|457.59839999999997|           2859.99|             1588.88695|
|      12188|   Ashley|          185.5992|           2319.99|              1265.6195|
|      17536| Benjamin|          185.5992|           2319.99|              1265.6195|
|      15100|   Brenda|          185.5992|           2

In [89]:
FinalOutput = (
    Sorted.drop(fn.col('CustomerKey'))
)

In [90]:
FinalOutput.show(10)

+---------+------------------+------------------+-----------------------+
|FirstName|       TotalTaxAmt|AverageSalesAmount|AverageTotalProductCost|
+---------+------------------+------------------+-----------------------+
|   Albert|          185.5992|           2319.99|              1265.6195|
|   Alexis|          271.9992|           3399.99|              1912.1544|
|   Alexis|          185.5992|           2319.99|              1265.6195|
|   Andres|          185.5992|           2319.99|              1265.6195|
|      Ann|          185.5992|           2319.99|              1265.6195|
|     Anne|457.59839999999997|           2859.99|             1588.88695|
|   Ashley|          185.5992|           2319.99|              1265.6195|
| Benjamin|          185.5992|           2319.99|              1265.6195|
|   Brenda|          185.5992|           2319.99|              1265.6195|
|    Cindy|          271.9992|           3399.99|              1912.1544|
+---------+------------------+--------