# Breakfast at the Frat: A Time Series Analysis

Sales and promotion information on the top five products from each of the top three brands within four selected categories (mouthwash, pretzels, frozen pizza, and boxed cereal), gathered from a sample of stores over 156 weeks.

- Unit sales, households, visits, and spend data by product, store, and week
- Base Price and Actual Shelf Price, to determine a product’s discount, if any
- Promotional support details (e.g., sale tag, in-store display), if applicable for the given product/store/week
- Store information, including size and location, as well as a price tier designation (e.g., upscale vs. value)
- Product information, including UPC, size, and description

To identify outliers, it is suggested to look at

- The ratio of units vs. number of visits
- The ratio of visits vs. number of households
- Some items that may be out-of-stock or discontinued for a store

**Source:** https://www.dunnhumby.com/source-files/

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .appName("breakfast") \
    .getOrCreate()

In [3]:
product_data_folder = "dataset/products"
store_data_folder = "dataset/stores"
transaction_data_folder = "dataset/transactions"

### Perform ETL to Answer the Following Questions

1. What is the range of prices offered on products?
1. What is the impact on units/visit of promotions by geographies?
1. Which products would you lower the price to increase sales?

In [4]:
product_df = spark. \
                read. \
                option('header',True). \
                csv(product_data_folder)

In [5]:
product_df.show(5)

+----------+--------------------+-------------+--------------------+--------------------+------------+
|       UPC|         DESCRIPTION| MANUFACTURER|            CATEGORY|        SUB_CATEGORY|PRODUCT_SIZE|
+----------+--------------------+-------------+--------------------+--------------------+------------+
|1111009477|PL MINI TWIST PRE...|PRIVATE LABEL|          BAG SNACKS|            PRETZELS|       15 OZ|
|1111009497|   PL PRETZEL STICKS|PRIVATE LABEL|          BAG SNACKS|            PRETZELS|       15 OZ|
|1111009507|   PL TWIST PRETZELS|PRIVATE LABEL|          BAG SNACKS|            PRETZELS|       15 OZ|
|1111035398|PL BL MINT ANTSPT...|PRIVATE LABEL|ORAL HYGIENE PROD...|MOUTHWASHES (ANTI...|      1.5 LT|
|1111038078|PL BL MINT ANTSPT...|PRIVATE LABEL|ORAL HYGIENE PROD...|MOUTHWASHES (ANTI...|      500 ML|
+----------+--------------------+-------------+--------------------+--------------------+------------+
only showing top 5 rows



In [6]:
store_df = spark. \
                read. \
                option('header',True). \
                csv(store_data_folder)

In [7]:
store_df.show(5)

+--------+------------------+-----------------+-----------------------+--------+--------------+-----------------+-------------------+------------------+
|STORE_ID|        STORE_NAME|ADDRESS_CITY_NAME|ADDRESS_STATE_PROV_CODE|MSA_CODE|SEG_VALUE_NAME|PARKING_SPACE_QTY|SALES_AREA_SIZE_NUM|AVG_WEEKLY_BASKETS|
+--------+------------------+-----------------+-----------------------+--------+--------------+-----------------+-------------------+------------------+
|     389|        SILVERLAKE|         ERLANGER|                     KY|   17140|    MAINSTREAM|              408|              46073|             24767|
|    2277|ANDERSON TOWNE CTR|       CINCINNATI|                     OH|   17140|       UPSCALE|             null|              81958|             54053|
|    4259|     WARSAW AVENUE|       CINCINNATI|                     OH|   17140|         VALUE|             null|              48813|             31177|
|    6379|          KINGWOOD|         KINGWOOD|                     TX|   26420|  

In [8]:
transactions_df = spark.read.option("header",True).csv(transaction_data_folder)

In [44]:
transactions_df.show(5)

+-------------+---------+----------+-----+------+---+-----+-----+----------+-------+-------+--------+
|WEEK_END_DATE|STORE_NUM|       UPC|UNITS|VISITS|HHS|SPEND|PRICE|BASE_PRICE|FEATURE|DISPLAY|TPR_ONLY|
+-------------+---------+----------+-----+------+---+-----+-----+----------+-------+-------+--------+
|    14-Jan-09|      367|1111009477|   13|    13| 13|18.07| 1.39|      1.57|      0|      0|       1|
|    14-Jan-09|      367|1111009497|   20|    18| 18| 27.8| 1.39|      1.39|      0|      0|       0|
|    14-Jan-09|      367|1111009507|   14|    14| 14|19.32| 1.38|      1.38|      0|      0|       0|
|    14-Jan-09|      367|1111035398|    4|     3|  3|   14|  3.5|      4.49|      0|      0|       1|
|    14-Jan-09|      367|1111038078|    3|     3|  3|  7.5|  2.5|       2.5|      0|      0|       0|
+-------------+---------+----------+-----+------+---+-----+-----+----------+-------+-------+--------+
only showing top 5 rows



In [11]:
# print(transactions.groupBy('UPC').agg({'PRICE': 'min'}).collect())
# print(transactions.groupBy('UPC').agg({'PRICE': 'max'}).collect())

In [12]:
product_df.createOrReplaceTempView('products')
transactions_df.createOrReplaceTempView('transactions')
store_df.createOrReplaceTempView('stores')

In [13]:
# 1. What is the range of prices offered on products?

products_price_range = spark.sql("""
                            select p.UPC
                                , p.DESCRIPTION
                                , p.CATEGORY
                                , min(t.PRICE) as MIN_PRICE
                                , max(t.PRICE) as MAX_PRICE
                                , avg(t.PRICE) as AVG_PRICE
                            from products p
                            left join transactions t
                            on p.UPC = t.UPC
                            group by p.UPC, p.DESCRIPTION, p.CATEGORY
                            order by p.UPC
                        """)

products_price_range.show()

# destination = "products_price_range"
# products_price_range.write.partitionBy("year","month","day").mode("overwrite").csv(destination)

+----------+--------------------+--------------------+---------+---------+------------------+
|       UPC|         DESCRIPTION|            CATEGORY|MIN_PRICE|MAX_PRICE|         AVG_PRICE|
+----------+--------------------+--------------------+---------+---------+------------------+
|1111009477|PL MINI TWIST PRE...|          BAG SNACKS|     0.89|     1.83| 1.300309097001017|
|1111009497|   PL PRETZEL STICKS|          BAG SNACKS|     0.86|     1.69|1.3023260869563715|
|1111009507|   PL TWIST PRETZELS|          BAG SNACKS|      0.8|     1.69|1.3116138175375258|
|1111035398|PL BL MINT ANTSPT...|ORAL HYGIENE PROD...|        1|     4.69|3.1535704656228067|
|1111038078|PL BL MINT ANTSPT...|ORAL HYGIENE PROD...|     0.47|     3.08|1.4523977596016782|
|1111038080|PL ANTSPTC SPG MN...|ORAL HYGIENE PROD...|     0.46|     4.18|1.4451583583208103|
|1111085319|PL HONEY NUT TOAS...|         COLD CEREAL|     1.07|     1.99| 1.759916380968303|
|1111085345|      PL RAISIN BRAN|         COLD CEREAL|     0

In [20]:
# 2.What is the impact on units/visit of promotions by geographies?

store_count_trans = spark.sql("""
                        select s.STORE_ID
                            , s.STORE_NAME
                            , t.DISPLAY
                            , count(t.STORE_NUM) AS NUM_TRANSACTIONS
                        from stores s
                        left join transactions t 
                        on s.STORE_ID = t.STORE_NUM
                        group by s.STORE_ID, s.STORE_NAME, t.DISPLAY
                        order by s.STORE_ID, t.DISPLAY
                    """)

store_count_trans.show()

+--------+---------------+-------+----------------+
|STORE_ID|     STORE_NAME|DISPLAY|NUM_TRANSACTIONS|
+--------+---------------+-------+----------------+
|   10019| AT EASTEX FRWY|      0|            5107|
|   10019| AT EASTEX FRWY|      1|             370|
|   11757|   INDEPENDENCE|      0|            6458|
|   11757|   INDEPENDENCE|      1|             834|
|   11761| MIAMI TOWNSHIP|      0|            6704|
|   11761| MIAMI TOWNSHIP|      1|            1033|
|   11967|NORTHBOROUGH SQ|      0|            4822|
|   11967|NORTHBOROUGH SQ|      1|             282|
|   11993|         DALLAS|      0|            6360|
|   11993|         DALLAS|      1|             552|
|   12011|        SHERMAN|      0|            5608|
|   12011|        SHERMAN|      1|             530|
|   13609|       VANDALIA|      0|            6682|
|   13609|       VANDALIA|      1|            1133|
|   13827|       BEAUMONT|      0|            5893|
|   13827|       BEAUMONT|      1|             720|
|   13837|  

In [64]:
# 3. Which products would you lower the price to increase sales?

product_price_trans = spark.sql("""
    with BASE as (
                            select p.UPC
                                 , p.DESCRIPTION
                                 , p.CATEGORY
                                 , avg(t.PRICE) AS AVG_PRICE
                            from products p
                            left join transactions t
                            on t.UPC = p.UPC
                            group by p.UPC, p.DESCRIPTION, p.CATEGORY
    )
    , LOWER as (
                            select p.UPC
                                , p.DESCRIPTION
                                , p.CATEGORY
                                , sum(t.PRICE) AS SUM_LOWER
                                , avg(t.PRICE) AS LOWER_PRICE
                            from products p
                            left join BASE b
                            on b.UPC = p.UPC
                            left join transactions t
                            on t.UPC = p.UPC
                            and t.PRICE < b.AVG_PRICE
                            group by p.UPC, p.DESCRIPTION, p.CATEGORY
    ) 
    , HIGHER as (
                            select p.UPC
                                , p.DESCRIPTION
                                , p.CATEGORY
                                , sum(t.PRICE) AS SUM_HIGHER
                                , avg(t.PRICE) AS HIGHER_PRICE
                            from products p
                            left join BASE b
                            on b.UPC = p.UPC
                            left join transactions t
                            on t.UPC = p.UPC
                            and t.PRICE > b.AVG_PRICE
                            group by p.UPC, p.DESCRIPTION, p.CATEGORY
    ) 
    , AVG as (
                            select p.UPC
                                , p.DESCRIPTION
                                , p.CATEGORY
                                , sum(t.PRICE) AS SUM_AVG
                            from products p
                            left join BASE b
                            on b.UPC = p.UPC
                            left join transactions t
                            on t.UPC = p.UPC
                            and t.PRICE = b.AVG_PRICE
                            group by p.UPC, p.DESCRIPTION, p.CATEGORY
    ) 
    select b.UPC
         , b.DESCRIPTION
         , b.CATEGORY
         , b.AVG_PRICE
         , a.SUM_AVG
         , l.LOWER_PRICE
         , l.SUM_LOWER
         , h.HIGHER_PRICE
         , h.SUM_HIGHER
         , l.SUM_LOWER / h.SUM_HIGHER AS PCT
    from BASE b
    left join LOWER l 
    on l.UPC = b.UPC 
    left join HIGHER h
    on h.UPC = b.UPC
    left join AVG a
    on a.UPC = b.UPC
    where l.SUM_LOWER > h.SUM_HIGHER 
    order by b.UPC
""")

product_price_trans.show(100)

+-----------+--------------------+--------------------+------------------+-------+------------------+------------------+------------------+------------------+------------------+
|        UPC|         DESCRIPTION|            CATEGORY|         AVG_PRICE|SUM_AVG|       LOWER_PRICE|         SUM_LOWER|      HIGHER_PRICE|        SUM_HIGHER|               PCT|
+-----------+--------------------+--------------------+------------------+-------+------------------+------------------+------------------+------------------+------------------+
| 1111009477|PL MINI TWIST PRE...|          BAG SNACKS| 1.300309097001017|   null|1.1438495445194747| 7785.039999999545| 1.507803975058407| 7738.049999999744|  1.00607258934742|
| 1111009497|   PL PRETZEL STICKS|          BAG SNACKS|1.3023260869563715|   null|1.1464658457929486| 7821.189999999496|1.5092701440248644| 7754.629999999753| 1.008583259291513|
| 1111035398|PL BL MINT ANTSPT...|ORAL HYGIENE PROD...|3.1535704656228067|   null|2.9438728999269967|20150.810

In [17]:
# Transform product_size

spark.sql("""
    select p.UPC
    , p.PRODUCT_SIZE
    , split(product_size, ' ')[0] as VALUE
    , split(product_size, ' ')[1] as UNIT
    , case when split(product_size, ' ')[1] = 'OZ' 
           then split(product_size, ' ')[0] * 0.0296
           when split(product_size, ' ')[1] = 'ML'
           then split(product_size, ' ')[0] * 0.001
           else split(product_size, ' ')[0]
           end AS SIZE_LT
    from products p
""").show()

+----------+------------+-----+----+-------------------+
|       UPC|PRODUCT_SIZE|VALUE|UNIT|            SIZE_LT|
+----------+------------+-----+----+-------------------+
|1111009477|       15 OZ|   15|  OZ|              0.444|
|1111009497|       15 OZ|   15|  OZ|              0.444|
|1111009507|       15 OZ|   15|  OZ|              0.444|
|1111035398|      1.5 LT|  1.5|  LT|                1.5|
|1111038078|      500 ML|  500|  ML|                0.5|
|1111038080|      500 ML|  500|  ML|                0.5|
|1111085319|    12.25 OZ|12.25|  OZ|0.36260000000000003|
|1111085345|       20 OZ|   20|  OZ| 0.5920000000000001|
|1111085350|       18 OZ|   18|  OZ|             0.5328|
|1111087395|     32.7 OZ| 32.7|  OZ| 0.9679200000000001|
|1111087396|     30.5 OZ| 30.5|  OZ|             0.9028|
|1111087398|     29.6 OZ| 29.6|  OZ|            0.87616|
|1600027527|    12.25 OZ|12.25|  OZ|0.36260000000000003|
|1600027528|       18 OZ|   18|  OZ|             0.5328|
|1600027564|       12 OZ|   12|