In [46]:
from pyspark.sql import SparkSession

sp = SparkSession.builder.appName('Demo').getOrCreate()

In [47]:
internet = sp.read.csv('data/fact_internet_sales.csv', header=True, inferSchema=True)
currency = sp.read.csv('data/dim_currency.csv', header=True, inferSchema=True)
customer = sp.read.csv('data/dim_customer.csv', header=True, inferSchema=True)
date = sp.read.csv('data/dim_date.csv', header=True, inferSchema=True)
products = sp.read.csv('data/dim_product.csv', header=True, inferSchema=True)

In [48]:
internet.show(3)

+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+
|ProductKey|OrderDateKey|CustomerKey|CurrencyKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|
+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+
|       310|    20101229|      21768|         19|            1|  3578.27|       3578.27|                 0.0|           0.0|          2171.2942|       2171.2942|    3578.27|286.2616|89.4568|
|       346|    20101229|      28389|         39|            1|  3399.99|       3399.99|                 0.0|           0.0|          1912.1544|       1912.1544|    3399.99|271.9992|84.9998|
|       346|    20101229|      25863|        

In [49]:
products.show(3)

+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
|ProductKey|ProductAlternateKey|EnglishProductName|SpanishProductName|Color|ListPrice|Size|Weight|DaysToManufacture| Status|
+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
|         1|            AR-5381|   Adjustable Race|              NULL|   NA|     NULL|NULL|  NULL|                0|Current|
|         2|            BA-8327|      Bearing Ball|              NULL|   NA|     NULL|NULL|  NULL|                0|Current|
|         3|            BE-2349|   BB Ball Bearing|              NULL|   NA|     NULL|NULL|  NULL|                1|Current|
+----------+-------------------+------------------+------------------+-----+---------+----+------+-----------------+-------+
only showing top 3 rows



In [50]:
df = (
    internet
      .join(currency, internet.CurrencyKey == currency.CurrencyKey, how='inner')
      .join(customer, internet.CustomerKey == customer.CustomerKey, how='inner')
      .join(date, internet.OrderDateKey == date.DateKey, how='inner')
      .join(products, internet.ProductKey == products.ProductKey, how='inner')
)

In [51]:
df.show(3)

+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+-----------+--------------------+---------------+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+-------------------+------------+-------------------+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+----------+-------------------+--------------------+--------------------+------+---------+----+------+-----------------+------+
|ProductKey|OrderDateKey|CustomerKey|CurrencyKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Sale

In [52]:
from pyspark.sql.functions import col

df1 = df.filter(
    (col('EnglishDayNameOfWeek') == 'Sunday') &
    (col('Color') == 'Silver') &
    (col('Size').isNotNull()) &
    (col('Weight') >= 10) &
    (col('YearlyIncome') >= 100_000) &
    (col('TotalChildren') > 1)
)

In [53]:
df1.show()

+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+-----------+--------------------+--------------------+-----------+---------+----------+---------+---------+----------+-------------+------+------+--------------------+------------+-------------+--------------------+------------+-------------------+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+----------+-------------------+--------------------+--------------------+------+---------+----+------+-----------------+-------+
|ProductKey|OrderDateKey|CustomerKey|CurrencyKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductC

In [62]:
from pyspark.sql.functions import sum, avg

df2 = df.groupBy('OrderDateKey', 'FirstName').agg(
    sum('TaxAmt'),  
    avg('SalesAmount'),
    avg('TotalProductCost')
)

df2.show(3)


+------------+---------+-----------+----------------+---------------------+
|OrderDateKey|FirstName|sum(TaxAmt)|avg(SalesAmount)|avg(TotalProductCost)|
+------------+---------+-----------+----------------+---------------------+
|    20110112|  Shannon|   269.9992|         3374.99|            1898.0944|
|    20110205|    Andre|   286.2616|         3578.27|            2171.2942|
|    20110209|    Paula|   286.2616|         3578.27|            2171.2942|
+------------+---------+-----------+----------------+---------------------+
only showing top 3 rows



In [None]:
df2.orderBy('Firstname')
df2.show()

+------------+---------+-----------+----------------+---------------------+
|OrderDateKey|FirstName|sum(TaxAmt)|avg(SalesAmount)|avg(TotalProductCost)|
+------------+---------+-----------+----------------+---------------------+
|    20110112|  Shannon|   269.9992|         3374.99|            1898.0944|
|    20110205|    Andre|   286.2616|         3578.27|            2171.2942|
|    20110209|    Paula|   286.2616|         3578.27|            2171.2942|
|    20110216|     Kyle|   286.2616|         3578.27|            2171.2942|
|    20110322|   Sydney|   286.2616|         3578.27|            2171.2942|
|    20110429|   Briana|   286.2616|         3578.27|            2171.2942|
|    20110502|   Isaiah|   286.2616|         3578.27|            2171.2942|
|    20110512|   Dennis|    55.9279|        699.0982|             413.1463|
|    20110513|   Jessie|   271.9992|         3399.99|            1912.1544|
|    20110607| Isabella|    55.9279|        699.0982|             413.1463|
|    2011062

In [65]:
df2 = df2.drop('OrderDateKey')
df2.show()

+---------+-----------+----------------+---------------------+
|FirstName|sum(TaxAmt)|avg(SalesAmount)|avg(TotalProductCost)|
+---------+-----------+----------------+---------------------+
|  Shannon|   269.9992|         3374.99|            1898.0944|
|    Andre|   286.2616|         3578.27|            2171.2942|
|    Paula|   286.2616|         3578.27|            2171.2942|
|     Kyle|   286.2616|         3578.27|            2171.2942|
|   Sydney|   286.2616|         3578.27|            2171.2942|
|   Briana|   286.2616|         3578.27|            2171.2942|
|   Isaiah|   286.2616|         3578.27|            2171.2942|
|   Dennis|    55.9279|        699.0982|             413.1463|
|   Jessie|   271.9992|         3399.99|            1912.1544|
| Isabella|    55.9279|        699.0982|             413.1463|
|   Edward|   269.9992|         3374.99|            1898.0944|
|    Jenna|   269.9992|         3374.99|            1898.0944|
|  Shannon|   286.2616|         3578.27|            217