In [119]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import  col, count, sum, avg, to_timestamp, date_format, monotonically_increasing_id

# Creating session

In [120]:
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .getOrCreate()

# Data Import

In [121]:
df = spark.read.csv('Store_Sensors_data.csv', header=True, inferSchema=True)
df = df.withColumn('Hour', date_format(col('hour'), "HH:mm:ss"))
df.show()


+----------+--------+-------+---------+------------+-------+--------+
|      Date|    Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+--------+-------+---------+------------+-------+--------+
|2021-12-11|00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:30:00

In [122]:
df.select('Date', 'Hour').show()

+----------+--------+
|      Date|    Hour|
+----------+--------+
|2021-12-11|00:00:00|
|2021-12-11|00:00:00|
|2021-12-11|00:00:00|
|2021-12-11|00:00:00|
|2021-12-11|00:15:00|
|2021-12-11|00:15:00|
|2021-12-11|00:15:00|
|2021-12-11|00:15:00|
|2021-12-11|00:30:00|
|2021-12-11|00:30:00|
|2021-12-11|00:30:00|
|2021-12-11|00:30:00|
|2021-12-11|00:45:00|
|2021-12-11|00:45:00|
|2021-12-11|00:45:00|
|2021-12-11|00:45:00|
|2021-12-11|01:00:00|
|2021-12-11|01:00:00|
|2021-12-11|01:00:00|
|2021-12-11|01:00:00|
+----------+--------+
only showing top 20 rows



In [123]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- StoreNo: integer (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- EntranceName: string (nullable = true)
 |-- InCount: integer (nullable = true)
 |-- OutCount: integer (nullable = true)



### Without Infering the Schema

In [124]:
dff = spark.read.csv('Store_Sensors_data.csv', header=True)
dff.show()

+----------+------+-------+---------+------------+-------+--------+
|      Date|  Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+------+-------+---------+------------+-------+--------+
|2021-12-11| 0:0:0|     30| My_Store|         001|      0|       0|
|2021-12-11| 0:0:0|     30| My_Store|         002|      0|       0|
|2021-12-11| 0:0:0|     30| My_Store|         003|      0|       0|
|2021-12-11| 0:0:0|     30| My_Store|         004|      0|       0|
|2021-12-11|0:15:0|     30| My_Store|         001|      0|       0|
|2021-12-11|0:15:0|     30| My_Store|         002|      0|       0|
|2021-12-11|0:15:0|     30| My_Store|         003|      0|       0|
|2021-12-11|0:15:0|     30| My_Store|         004|      0|       0|
|2021-12-11|0:30:0|     30| My_Store|         001|      0|       0|
|2021-12-11|0:30:0|     30| My_Store|         002|      0|       0|
|2021-12-11|0:30:0|     30| My_Store|         003|      0|       0|
|2021-12-11|0:30:0|     30| My_Store|         00

In [125]:
dff.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- StoreNo: string (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- EntranceName: string (nullable = true)
 |-- InCount: string (nullable = true)
 |-- OutCount: string (nullable = true)



In [126]:
# dff = dff.withColumn('Hour', col('Hour').cast("integer"))

In [127]:
# Define the columns and their new types
columnMapping = {
    'Date': 'date',
    'Hour': 'timestamp',
    'StoreName': 'string',
    'StoreNo' : 'integer'
,    'EntranceName': 'string',
    'InCount': 'integer',
    'OutCount': 'integer'
}

# Change the types of multiple columns
for column, new_type in columnMapping.items():
    dff = dff.withColumn(column, col(column).cast(new_type))

In [128]:
dff.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Hour: timestamp (nullable = true)
 |-- StoreNo: integer (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- EntranceName: string (nullable = true)
 |-- InCount: integer (nullable = true)
 |-- OutCount: integer (nullable = true)



### Shallow copy problem does not exist

In [129]:
d = dff

In [130]:
d.show()

+----------+-------------------+-------+---------+------------+-------+--------+
|      Date|               Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+-------------------+-------+---------+------------+-------+--------+
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:30

In [131]:
dff = dff.withColumnRenamed('Hour', 'ZZZ')

In [132]:
dff.show()

+----------+-------------------+-------+---------+------------+-------+--------+
|      Date|                ZZZ|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+-------------------+-------+---------+------------+-------+--------+
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:30

In [133]:
d.show()

+----------+-------------------+-------+---------+------------+-------+--------+
|      Date|               Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+-------------------+-------+---------+------------+-------+--------+
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|2023-07-05 00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|2023-07-05 00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|2023-07-05 00:30

### Continue

In [134]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Hour: string (nullable = true)
 |-- StoreNo: integer (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- EntranceName: string (nullable = true)
 |-- InCount: integer (nullable = true)
 |-- OutCount: integer (nullable = true)



# Dim Tables

In [135]:
df.show()

+----------+--------+-------+---------+------------+-------+--------+
|      Date|    Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+--------+-------+---------+------------+-------+--------+
|2021-12-11|00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:30:00

## my_store

In [136]:
df.count()

2652

In [137]:
my_store = df.select(['StoreNo', 'StoreName'])

In [138]:
my_store = my_store.dropDuplicates()

In [139]:
my_store.show()

+-------+----------------+
|StoreNo|       StoreName|
+-------+----------------+
|     34|     Their_Store|
|     31|       Our_Store|
|     36| Their_Our_Store|
|     38|  Their_My_Store|
|     32|      Your_Store|
|     35|mine_Their_Store|
|     30|        My_Store|
+-------+----------------+



In [140]:
my_store = my_store.withColumn("StoreID", monotonically_increasing_id())

In [141]:
my_store = my_store.select("StoreID", "StoreName", "StoreNo")

In [142]:
my_store.show()

+-------+----------------+-------+
|StoreID|       StoreName|StoreNo|
+-------+----------------+-------+
|      0|     Their_Store|     34|
|      1|       Our_Store|     31|
|      2| Their_Our_Store|     36|
|      3|  Their_My_Store|     38|
|      4|      Your_Store|     32|
|      5|mine_Their_Store|     35|
|      6|        My_Store|     30|
+-------+----------------+-------+



## my_sensor

In [143]:
my_sensor = df.select(['StoreNo', 'EntranceName']).dropDuplicates().sort('StoreNo')

In [144]:
my_sensor.show()

+-------+-------------+
|StoreNo| EntranceName|
+-------+-------------+
|     30|          004|
|     30|          002|
|     30|          001|
|     30|          003|
|     31|Main Entrance|
|     32|         Main|
|     34|         Main|
|     35|         Main|
|     36|         Main|
|     38|         Main|
+-------+-------------+



In [145]:
my_sensor = my_sensor.join(my_store, how='inner', on='StoreNo')
my_sensor.show()

+-------+-------------+-------+----------------+
|StoreNo| EntranceName|StoreID|       StoreName|
+-------+-------------+-------+----------------+
|     34|         Main|      0|     Their_Store|
|     31|Main Entrance|      1|       Our_Store|
|     36|         Main|      2| Their_Our_Store|
|     38|         Main|      3|  Their_My_Store|
|     32|         Main|      4|      Your_Store|
|     35|         Main|      5|mine_Their_Store|
|     30|          003|      6|        My_Store|
|     30|          001|      6|        My_Store|
|     30|          002|      6|        My_Store|
|     30|          004|      6|        My_Store|
+-------+-------------+-------+----------------+



In [146]:
my_sensor = my_sensor.withColumn("SensorID", monotonically_increasing_id())

In [147]:
my_sensor.show()

+-------+-------------+-------+----------------+--------+
|StoreNo| EntranceName|StoreID|       StoreName|SensorID|
+-------+-------------+-------+----------------+--------+
|     34|         Main|      0|     Their_Store|       0|
|     31|Main Entrance|      1|       Our_Store|       1|
|     36|         Main|      2| Their_Our_Store|       2|
|     38|         Main|      3|  Their_My_Store|       3|
|     32|         Main|      4|      Your_Store|       4|
|     35|         Main|      5|mine_Their_Store|       5|
|     30|          003|      6|        My_Store|       6|
|     30|          001|      6|        My_Store|       7|
|     30|          002|      6|        My_Store|       8|
|     30|          004|      6|        My_Store|       9|
+-------+-------------+-------+----------------+--------+



In [148]:
my_sensor = my_sensor.select('SensorID', 'StoreID', 'EntranceName')

In [149]:
my_sensor.show()

+--------+-------+-------------+
|SensorID|StoreID| EntranceName|
+--------+-------+-------------+
|       0|      0|         Main|
|       1|      1|Main Entrance|
|       2|      2|         Main|
|       3|      3|         Main|
|       4|      4|         Main|
|       5|      5|         Main|
|       6|      6|          003|
|       7|      6|          001|
|       8|      6|          002|
|       9|      6|          004|
+--------+-------+-------------+



## dim_date

In [150]:
def get_dim_date_df(start_date: str, end_date: str, upsert_dataframe: pyspark.sql.dataframe.DataFrame = None) -> pyspark.sql.dataframe.DataFrame:
    """
    Generate a dimension date data frame with date-related information between the specified start and end dates.

    Args:
        start_date (str): The start date in the format 'YYYY-MM-DD'.
        end_date (str): The end date in the format 'YYYY-MM-DD'.
        upsert_dataframe (pd.DataFrame, optional): An existing dataframe to perform an upsert (update or insert) operation. 
                                                   If provided, new dates will be appended to the dataframe based on the 'Date_Key' column.
                                                   Defaults to None.

    Returns:
        pd.DataFrame: The generated dimension table with the following columns:

        - Date_Key: A unique key representing the date in the format 'YYYYMMDD'.
        - Full_Date: The full date in the format 'YYYY-MM-DD'.
        - Day_of_Week: The day of the week as an integer (1 for Monday, 2 for Tuesday, etc.).
        - Day_of_Month: The day of the month as an integer.
        - Day_of_Year: The day of the year as an integer.
        - Day_Name: The name of the day of the week.
        - Week_of_month: The week of the month as an integer.
        - Week_of_year: The week of the year as an integer.
        - Month_Of_year: The month of the year as an integer.
        - Days_in_Month: The number of days in the month.
        - Month_Name: The name of the month.
        - Year: The year as an integer.

    Raises:
        KeyError: Raised if the 'Date_Key' column is not present in the upsert_dataframe when performing an upsert operation.
    """
    dim_date_dict = {
        'Date_Key': [],
        'Full_Date': [],
        'Day_of_Week': [],
        'Day_of_Month': [],
        'Day_of_Year': [],
        'Day_Name': [],
        'Week_of_month': [],
        'Week_of_year': [],
        'Month_Of_year': [],
        'Days_in_Month': [],
        'Month_Name': [],
        'Year': []
    }

    for date_ in pd.date_range(start_date, end_date):
        full_date = str(date_)[:-9].strip()
        date_key = int(full_date.replace('-', ''))

        if upsert_dataframe is not None:
            if 'Date_Key' not in upsert_dataframe.columns:
                raise KeyError("No 'Date_Key' column found in the upsert dataframe.")
            if upsert_dataframe.filter(col('Date_Key').isin(date_key)).count() > 0:
                continue

        dim_date_dict['Date_Key'].append(date_key)
        dim_date_dict['Full_Date'].append(full_date)
        dim_date_dict['Day_of_Week'].append(date_.day_of_week + 1)
        dim_date_dict['Day_of_Month'].append(date_.day)
        dim_date_dict['Day_of_Year'].append(date_.day_of_year)
        dim_date_dict['Day_Name'].append(date_.day_name())
        dim_date_dict['Week_of_month'].append(date_.weekday() + 1)
        dim_date_dict['Week_of_year'].append(date_.weekofyear)
        dim_date_dict['Month_Of_year'].append(date_.month)
        dim_date_dict['Days_in_Month'].append(date_.days_in_month)
        dim_date_dict['Month_Name'].append(date_.month_name())
        dim_date_dict['Year'].append(date_.year)
    if len(dim_date_dict['Date_Key']) > 0:
        df = spark.createDataFrame(pd.DataFrame(dim_date_dict))
        if upsert_dataframe is not None:
            df = upsert_dataframe.union(df)
    else:
        df = upsert_dataframe
    return df

In [151]:
dim_date = get_dim_date_df('2020-01-01', '2020-01-05')
dim_date.show()

[Stage 40:>                                                         (0 + 1) / 1]

+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|Date_Key| Full_Date|Day_of_Week|Day_of_Month|Day_of_Year| Day_Name|Week_of_month|Week_of_year|Month_Of_year|Days_in_Month|Month_Name|Year|
+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|20200101|2020-01-01|          3|           1|          1|Wednesday|            3|           1|            1|           31|   January|2020|
|20200102|2020-01-02|          4|           2|          2| Thursday|            4|           1|            1|           31|   January|2020|
|20200103|2020-01-03|          5|           3|          3|   Friday|            5|           1|            1|           31|   January|2020|
|20200104|2020-01-04|          6|           4|          4| Saturday|            6|           1|            1|           31|   January|2020|
|20200105|2020-01-05

                                                                                

In [152]:
dim_date = get_dim_date_df('2020-01-01', '2020-01-06', dim_date)
dim_date.show()

+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|Date_Key| Full_Date|Day_of_Week|Day_of_Month|Day_of_Year| Day_Name|Week_of_month|Week_of_year|Month_Of_year|Days_in_Month|Month_Name|Year|
+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|20200101|2020-01-01|          3|           1|          1|Wednesday|            3|           1|            1|           31|   January|2020|
|20200102|2020-01-02|          4|           2|          2| Thursday|            4|           1|            1|           31|   January|2020|
|20200103|2020-01-03|          5|           3|          3|   Friday|            5|           1|            1|           31|   January|2020|
|20200104|2020-01-04|          6|           4|          4| Saturday|            6|           1|            1|           31|   January|2020|
|20200105|2020-01-05

## dim_time

- Have to create pd df first and then convert to spark df beacuse of hustle:

`TypeError: Can not infer schema for type:`

In [153]:
def get_dim_time_df(frequency: str = 'H') -> pyspark.sql.dataframe.DataFrame:
    """
    Generate a pandas DataFrame with start and end times based on a specified frequency.

    Args:
        frequency (str): Frequency of the time intervals. Defaults to 'H' (hourly).

    Returns:
        pd.DataFrame: DataFrame with columns 'StartTime' and 'EndTime'.

    """
    startTime = []
    timekey = []
    for date_ in pd.date_range(start='2023-01-01', end='2023-01-02', freq=frequency):
        time = date_.time()
        startTime.append(str(time))
        timekey.append(int(str(time).replace(':', '')))
    startTime = startTime[:-1]
    timekey = timekey[:-1]
    endTime = startTime[1:] + startTime[0:1]

    # return spark.createDataFrame(pd.DataFrame({'Time_Key':timekey,'Start_Time': startTime, 'End_Time': endTime}))
    res = spark.createDataFrame(pd.DataFrame({'Time_Key':timekey,'Start_Time': startTime, 'End_Time': endTime}))

    # res = res.withColumn('Start_Time', date_format(col('Start_Time'), format="HH:mm:ss"))

    return res

In [154]:
p = get_dim_time_df('100S')
p.show()

+--------+----------+--------+
|Time_Key|Start_Time|End_Time|
+--------+----------+--------+
|       0|  00:00:00|00:01:40|
|     140|  00:01:40|00:03:20|
|     320|  00:03:20|00:05:00|
|     500|  00:05:00|00:06:40|
|     640|  00:06:40|00:08:20|
|     820|  00:08:20|00:10:00|
|    1000|  00:10:00|00:11:40|
|    1140|  00:11:40|00:13:20|
|    1320|  00:13:20|00:15:00|
|    1500|  00:15:00|00:16:40|
|    1640|  00:16:40|00:18:20|
|    1820|  00:18:20|00:20:00|
|    2000|  00:20:00|00:21:40|
|    2140|  00:21:40|00:23:20|
|    2320|  00:23:20|00:25:00|
|    2500|  00:25:00|00:26:40|
|    2640|  00:26:40|00:28:20|
|    2820|  00:28:20|00:30:00|
|    3000|  00:30:00|00:31:40|
|    3140|  00:31:40|00:33:20|
+--------+----------+--------+
only showing top 20 rows



# Fact Tables

In [155]:
df.show()
df.count()

+----------+--------+-------+---------+------------+-------+--------+
|      Date|    Hour|StoreNo|StoreName|EntranceName|InCount|OutCount|
+----------+--------+-------+---------+------------+-------+--------+
|2021-12-11|00:00:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:00:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:15:00|     30| My_Store|         004|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         001|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         002|      0|       0|
|2021-12-11|00:30:00|     30| My_Store|         003|      0|       0|
|2021-12-11|00:30:00

2652

In [156]:
dim_date = get_dim_date_df('2019-01-01', '2025-01-01')

In [157]:
dim_date.limit(5).show()

+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|Date_Key| Full_Date|Day_of_Week|Day_of_Month|Day_of_Year| Day_Name|Week_of_month|Week_of_year|Month_Of_year|Days_in_Month|Month_Name|Year|
+--------+----------+-----------+------------+-----------+---------+-------------+------------+-------------+-------------+----------+----+
|20190101|2019-01-01|          2|           1|          1|  Tuesday|            2|           1|            1|           31|   January|2019|
|20190102|2019-01-02|          3|           2|          2|Wednesday|            3|           1|            1|           31|   January|2019|
|20190103|2019-01-03|          4|           3|          3| Thursday|            4|           1|            1|           31|   January|2019|
|20190104|2019-01-04|          5|           4|          4|   Friday|            5|           1|            1|           31|   January|2019|
|20190105|2019-01-05

## Completing look ups for `date`, `time`, `sensor` and `store` IDs

In [158]:
dim_quarter_hourly_time = get_dim_time_df('900S')
dim_quarter_hourly_time.show()
dim_quarter_hourly_time.printSchema()

+--------+----------+--------+
|Time_Key|Start_Time|End_Time|
+--------+----------+--------+
|       0|  00:00:00|00:15:00|
|    1500|  00:15:00|00:30:00|
|    3000|  00:30:00|00:45:00|
|    4500|  00:45:00|01:00:00|
|   10000|  01:00:00|01:15:00|
|   11500|  01:15:00|01:30:00|
|   13000|  01:30:00|01:45:00|
|   14500|  01:45:00|02:00:00|
|   20000|  02:00:00|02:15:00|
|   21500|  02:15:00|02:30:00|
|   23000|  02:30:00|02:45:00|
|   24500|  02:45:00|03:00:00|
|   30000|  03:00:00|03:15:00|
|   31500|  03:15:00|03:30:00|
|   33000|  03:30:00|03:45:00|
|   34500|  03:45:00|04:00:00|
|   40000|  04:00:00|04:15:00|
|   41500|  04:15:00|04:30:00|
|   43000|  04:30:00|04:45:00|
|   44500|  04:45:00|05:00:00|
+--------+----------+--------+
only showing top 20 rows

root
 |-- Time_Key: long (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)



### `time key` lookup

In [159]:
df = df.join(dim_quarter_hourly_time, col('Hour') == col('Start_Time'))
df = df.select(["Date", 'Time_Key', "StoreNo", "EntranceName", "InCount", "OutCount"])

In [160]:
df.show()

+----------+--------+-------+-------------+-------+--------+
|      Date|Time_Key|StoreNo| EntranceName|InCount|OutCount|
+----------+--------+-------+-------------+-------+--------+
|2021-12-13|       0|     38|         Main|      0|       0|
|2021-12-12|       0|     38|         Main|      0|       0|
|2021-12-11|       0|     38|         Main|      0|       0|
|2021-12-13|       0|     36|         Main|      0|       0|
|2021-12-12|       0|     36|         Main|      0|       0|
|2021-12-11|       0|     36|         Main|      0|       0|
|2021-12-13|       0|     35|         Main|      0|       0|
|2021-12-12|       0|     35|         Main|      0|       0|
|2021-12-11|       0|     35|         Main|      0|       0|
|2021-12-13|       0|     34|         Main|      0|       0|
|2021-12-12|       0|     34|         Main|      0|       0|
|2021-12-11|       0|     34|         Main|      0|       0|
|2021-12-13|       0|     32|         Main|      0|       0|
|2021-12-12|       0|   

### `date key` lookup

In [161]:
df = df.join(dim_date, col('Date') == col('Full_Date'))
df = df.select(['Date_Key', 'Time_Key', "StoreNo", "EntranceName", "InCount", "OutCount"] )

In [162]:
df.show()

+--------+--------+-------+-------------+-------+--------+
|Date_Key|Time_Key|StoreNo| EntranceName|InCount|OutCount|
+--------+--------+-------+-------------+-------+--------+
|20211213|       0|     38|         Main|      0|       0|
|20211212|       0|     38|         Main|      0|       0|
|20211211|       0|     38|         Main|      0|       0|
|20211213|       0|     36|         Main|      0|       0|
|20211212|       0|     36|         Main|      0|       0|
|20211211|       0|     36|         Main|      0|       0|
|20211213|       0|     35|         Main|      0|       0|
|20211212|       0|     35|         Main|      0|       0|
|20211211|       0|     35|         Main|      0|       0|
|20211213|       0|     34|         Main|      0|       0|
|20211212|       0|     34|         Main|      0|       0|
|20211211|       0|     34|         Main|      0|       0|
|20211213|       0|     32|         Main|      0|       0|
|20211212|       0|     32|         Main|      0|       

### `StoreID` lookup

In [163]:
df = df.join(my_store, on=['StoreNo']).select('Date_Key', 'Time_Key', 'StoreID','EntranceName', 'InCount', 'OutCount')

In [164]:
df.show()

+--------+--------+-------+-------------+-------+--------+
|Date_Key|Time_Key|StoreID| EntranceName|InCount|OutCount|
+--------+--------+-------+-------------+-------+--------+
|20211212|       0|      3|         Main|      0|       0|
|20211212|       0|      2|         Main|      0|       0|
|20211212|       0|      5|         Main|      0|       0|
|20211212|       0|      0|         Main|      0|       0|
|20211212|       0|      4|         Main|      0|       0|
|20211212|       0|      1|Main Entrance|      0|       0|
|20211212|       0|      6|          004|      0|       0|
|20211212|       0|      6|          003|      0|       0|
|20211212|       0|      6|          002|      0|       0|
|20211212|       0|      6|          001|      0|       0|
|20211212|    1500|      3|         Main|      0|       0|
|20211212|    1500|      2|         Main|      0|       0|
|20211212|    1500|      5|         Main|      0|       0|
|20211212|    1500|      0|         Main|      0|       

### `Sensor ID` Lookup

In [165]:
df = df.join(my_sensor, on=['StoreID', 'EntranceName']).select(['Date_Key', 'Time_Key', 'StoreID', 'SensorID', 'InCount', 'OutCount'])

In [166]:
df.show()
df.count()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211213|       0|      0|       1|      0|       0|
|20211212|       0|      0|       1|      0|       0|
|20211211|       0|      0|       1|      0|       0|
|20211213|    1500|      0|       1|      0|       0|
|20211212|    1500|      0|       1|      0|       0|
|20211211|    1500|      0|       1|      0|       0|
|20211213|    3000|      0|       1|      0|       0|
|20211212|    3000|      0|       1|      0|       0|
|20211211|    3000|      0|       1|      0|       0|
|20211213|    4500|      0|       1|      0|       0|
|20211212|    4500|      0|       1|      0|       0|
|20211211|    4500|      0|       1|      0|       0|
|20211213|   10000|      0|       1|      0|       0|
|20211212|   10000|      0|       1|      0|       0|
|20211211|   10000|      0|       1|      0|       0|
|20211213|   11500|      0| 

2652

## Fact Hourly Table

In [167]:
df.show()
df.count()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211213|       0|      3|       5|      0|       0|
|20211212|       0|      3|       5|      0|       0|
|20211211|       0|      3|       5|      0|       0|
|20211213|       0|      2|       0|      0|       0|
|20211212|       0|      2|       0|      0|       0|
|20211211|       0|      2|       0|      0|       0|
|20211213|       0|      5|       8|      0|       0|
|20211212|       0|      5|       8|      0|       0|
|20211211|       0|      5|       8|      0|       0|
|20211213|       0|      0|       1|      0|       0|
|20211212|       0|      0|       1|      0|       0|
|20211211|       0|      0|       1|      0|       0|
|20211213|       0|      4|       9|      0|       0|
|20211212|       0|      4|       9|      0|       0|
|20211211|       0|      4|       9|      0|       0|
|20211213|       0|      1| 

2652

In [168]:
dim_hourly_data = get_dim_time_df()
dim_hourly_data.show()
dim_hourly_data.count()

+--------+----------+--------+
|Time_Key|Start_Time|End_Time|
+--------+----------+--------+
|       0|  00:00:00|01:00:00|
|   10000|  01:00:00|02:00:00|
|   20000|  02:00:00|03:00:00|
|   30000|  03:00:00|04:00:00|
|   40000|  04:00:00|05:00:00|
|   50000|  05:00:00|06:00:00|
|   60000|  06:00:00|07:00:00|
|   70000|  07:00:00|08:00:00|
|   80000|  08:00:00|09:00:00|
|   90000|  09:00:00|10:00:00|
|  100000|  10:00:00|11:00:00|
|  110000|  11:00:00|12:00:00|
|  120000|  12:00:00|13:00:00|
|  130000|  13:00:00|14:00:00|
|  140000|  14:00:00|15:00:00|
|  150000|  15:00:00|16:00:00|
|  160000|  16:00:00|17:00:00|
|  170000|  17:00:00|18:00:00|
|  180000|  18:00:00|19:00:00|
|  190000|  19:00:00|20:00:00|
+--------+----------+--------+
only showing top 20 rows



24

In [169]:
fact_hourly_data = df
fact_hourly_data.count()

2652

In [170]:
fact_hourly_data = fact_hourly_data.withColumn(
    'Time_Key', col('Time_Key') - col('Time_Key')%10000
)

In [171]:
fact_hourly_data.show()
fact_hourly_data.count()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211213|       0|      3|       5|      0|       0|
|20211212|       0|      3|       5|      0|       0|
|20211211|       0|      3|       5|      0|       0|
|20211213|       0|      2|       0|      0|       0|
|20211212|       0|      2|       0|      0|       0|
|20211211|       0|      2|       0|      0|       0|
|20211213|       0|      5|       8|      0|       0|
|20211212|       0|      5|       8|      0|       0|
|20211211|       0|      5|       8|      0|       0|
|20211213|       0|      0|       1|      0|       0|
|20211212|       0|      0|       1|      0|       0|
|20211211|       0|      0|       1|      0|       0|
|20211213|       0|      4|       9|      0|       0|
|20211212|       0|      4|       9|      0|       0|
|20211211|       0|      4|       9|      0|       0|
|20211213|       0|      1| 

2652

In [172]:
fact_hourly_data = fact_hourly_data.groupBy(['Date_Key', 'Time_Key', 'StoreID', 'SensorID']).agg(
    sum("InCount").alias("InCount"),
    sum("OutCount").alias("OutCount")
    ).sort(['SensorID'])
fact_hourly_data.show()
fact_hourly_data.count()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211213|   80000|      2|       0|     10|       7|
|20211211|  120000|      2|       0|    153|     150|
|20211212|   60000|      2|       0|      0|       0|
|20211211|   10000|      2|       0|      0|       0|
|20211211|   70000|      2|       0|      0|       0|
|20211211|       0|      2|       0|      0|       0|
|20211211|   60000|      2|       0|      0|       0|
|20211213|   20000|      2|       0|      0|       0|
|20211212|   80000|      2|       0|      0|       0|
|20211212|   20000|      2|       0|      0|       0|
|20211213|   60000|      2|       0|      0|       0|
|20211212|   30000|      2|       0|      0|       0|
|20211211|   80000|      2|       0|      1|       0|
|20211211|   40000|      2|       0|      0|       0|
|20211212|  100000|      2|       0|    164|     152|
|20211212|   40000|      2| 

666

## Fact Quarter Hourly Data

In [173]:
fact_quarter_hourly_data = df

fact_quarter_hourly_data.show()
fact_quarter_hourly_data.count()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211212|       0|      3|       5|      0|       0|
|20211212|       0|      2|       0|      0|       0|
|20211212|       0|      5|       8|      0|       0|
|20211212|       0|      0|       1|      0|       0|
|20211212|       0|      4|       9|      0|       0|
|20211212|       0|      1|       4|      0|       0|
|20211212|       0|      6|       2|      0|       0|
|20211212|       0|      6|       7|      0|       0|
|20211212|       0|      6|       3|      0|       0|
|20211212|       0|      6|       6|      0|       0|
|20211212|    1500|      3|       5|      0|       0|
|20211212|    1500|      2|       0|      0|       0|
|20211212|    1500|      5|       8|      0|       0|
|20211212|    1500|      0|       1|      0|       0|
|20211212|    1500|      4|       9|      0|       0|
|20211212|    1500|      1| 

2652

## Daily Data

In [174]:
daily_data = df
daily_data.show()

+--------+--------+-------+--------+-------+--------+
|Date_Key|Time_Key|StoreID|SensorID|InCount|OutCount|
+--------+--------+-------+--------+-------+--------+
|20211212|       0|      3|       5|      0|       0|
|20211212|       0|      2|       0|      0|       0|
|20211212|       0|      5|       8|      0|       0|
|20211212|       0|      0|       1|      0|       0|
|20211212|       0|      4|       9|      0|       0|
|20211212|       0|      1|       4|      0|       0|
|20211212|       0|      6|       2|      0|       0|
|20211212|       0|      6|       7|      0|       0|
|20211212|       0|      6|       3|      0|       0|
|20211212|       0|      6|       6|      0|       0|
|20211212|    1500|      3|       5|      0|       0|
|20211212|    1500|      2|       0|      0|       0|
|20211212|    1500|      5|       8|      0|       0|
|20211212|    1500|      0|       1|      0|       0|
|20211212|    1500|      4|       9|      0|       0|
|20211212|    1500|      1| 

In [175]:
daily_data = daily_data.groupBy(['Date_Key', 'StoreID', 'SensorID']).agg(
    sum('InCount').alias('InCount'),
    sum('OutCount').alias('OutCount'),
)

In [176]:
daily_data.show()
daily_data.count()

+--------+-------+--------+-------+--------+
|Date_Key|StoreID|SensorID|InCount|OutCount|
+--------+-------+--------+-------+--------+
|20211212|      3|       5|    636|     634|
|20211212|      6|       2|    243|     193|
|20211213|      5|       8|    141|     133|
|20211211|      6|       6|     60|      70|
|20211213|      0|       1|    338|     322|
|20211213|      6|       7|     35|      31|
|20211211|      5|       8|    424|     419|
|20211212|      6|       7|    216|     190|
|20211213|      6|       6|     20|      14|
|20211213|      1|       4|    208|     216|
|20211211|      3|       5|    421|     416|
|20211211|      6|       7|    156|     130|
|20211213|      3|       5|    142|     139|
|20211212|      5|       8|    580|     571|
|20211212|      0|       1|   1484|    1471|
|20211211|      0|       1|   1171|    1162|
|20211212|      4|       9|   1201|    1199|
|20211213|      6|       3|      5|      11|
|20211211|      4|       9|    936|     935|
|20211213|

30

# Session Halt

In [177]:
spark.stop()