# Beer Sales, Preferences, and the Macroeconomy
### Data Engineering Platforms (Fall 2019) | The University of Chicago

## 0. Introduction

### 0.1. Executive summary 

Alcohol is one of the most popular purchases in the US, but does consumers’ love of beer vary with their economic condition? In this project, we seek to derive insights on alcohol consumption patterns with respect to changes in US economic metrics. 

Questions that this project can potentially answer:

- Do preferences for beer types change during recession? Do consumers favor particular beers, price points, or alcohol content?
- How does unemployment affect beer purchasing? Does it significantly impact the total amount purchased, or just shifts the product mix?

### 0.2. Business use case

This project can provide insight into consumer purchasing habits, which is highly desirable to a variety of businesses. Breweries can use this data to plan their production, such that they focus their production on the beer with the highest expected demand. Retailers and restaurants can use this data to plan their inventory, stocking particular products ahead of expected increases in demand.

### 0.3. Methodology

The order of operation is as follows: 

1. First, the schema (snowflake EER) of the final MySQL database is designed upon examining the data.  
2. The relevant data from the IRI dataset are combined in groups and read into dataframes in Python. 
3. Text columns that appear to be non-categorical are clustered in OpenRefine. 
4. The clustered results then are used as a dictionary to normalise said dataframes. 
5. The dataframes are then read into a local MySWL server in observance of foreign key constraints, and upon verification, is migrated to Google Cloud SQL. 
6. This then allows data access for visualisation (and statistical analysis in the future) by various stakeholders using Python, R, and Tableau. 

### 0.4. Data sources

- The IRI Academic Marketing Data Set (Bronnenberg, et al, 2012) - 130 GB unzipped - NDA required, access through The University of Chicago Office of Research and National Laboratories Research Computing Center 
- St. Louis Fed Federal Research Economic Data (FRED) - through FRED API

### 0.5 Prerequisites

System requirements: 50 GB of free drive space. 8 GB memory. Jupyter Lab/Notebook, OpenRefine 3.2, MySQL 8.0.18 server, MySQL Workbench 8.0, Google Cloud Platform, Cloud SQL, and Tableau.

The MySQL database size will be approximately **17 GB**. Check by running the following code in MySQL:  

SELECT table_schema 'database name',  
  sum( data_length + index_length ) / 1024 / 1024 /1024 'data Base Size in GB'  
FROM information_schema.TABLES  
GROUP BY table_schema;

MySQL Workbench DBMS connection read timeout interval to be set at >3600 seconds.

Section 4 requires an empty schema `beer` in MySQL 8. The code is provided in `section 4.0`. 

The following packages are also required and can be installed using `pip` or `conda`:  
`os`, `glob` (allows for UNIX-style pathname pattern expansion), `NumPy`, `pandas 0.25` , `sqlalchemy` (writes records stored in a DataFrame to a SQL database), `tqdm` (low overhead iterable progress bar), and `fredapi` (pulls data from St Louis Fed FRED API).

**IMPORTANT**: pandas version `0.24.+` is required as pandas has gained the ability to hold integer dtypes with missing values.

### 0.6. Sections in this notebook

The following sections in this notebook progress as follows: 

Section 1 explains the procedure to access the IRI dataset on UChicago Research Computing Center and documents the steps taken to extract the necessary files and directories pertinent to this project, given the limitations of the memory size of personal laptop computers. 

Section 2 provides an overview of the IRI dataset and its various dimensions, their limitations. 

Section 3 describes the fact-dimension schema in MySQL. 

Section 4 includes the code to create an blank schema on a local MySQL server.

Section 5-9 works with import, transformation, normalisation, and pushing data (product, store, sales, dates, and economic data) onto the local MySQL server. 

Section 10 details the steps taken to migrate the database from local server to Google Cloud SQL. 

Section 11 gives a quick run-down of Cloud SQL access for data visualisation and analysis in Tableau.

---

## 1. IRI Data extraction:

0. Sign NDA for the IRI Academic Database. 

1. Connect to RCC /project2: <https://rcc.uchicago.edu/docs/data-transfer/index.html> smb://midwaysmb.rcc.uchicago.edu/project2 Username: ADLOCAL\CNetID Password: CNet password Hostname: midwaysmb.rcc.uchicago.edu

2. Navigate to `/project2/databases/IRIData/`

3. Unzip `zYearXX` files and extract beer directory

4. Relocate contents of `BEER` directory into `YearXX` directory

5. Rename `zparsed stub files.zip` as `parsed stub files.zip`

6. Collect all BEER product attribute files `parsed stub files` into the directory "beer_attributes".

  - Renamed `prod_beer.xlsx` and `prod_beer_sz.xlsx` from the `parsed stub files` directory as `prod01_beer.xlsx` and `prod01_beer_sz.xlsx`.
  - Renamed prod_beer.xlsx from the `parsed stub files 2007` directory as `prod07_beer.xlsx`.
  
---


## 2. Overview of the `IRI dataset`:

Dataset size: 8 GB unzipped

Dataset range: January 1, 2001 (week 1114) to December 30, 2012 (1739).

Data sets are separated by year. In each year, there are the following files:

- `ADB Measure Definitions.doc` defines store measures
- `Delivery_Stores` defines stores included in the year's files
- `demos.csv` identifies the demographics of the panelists
- `IRI week translation.xls` defines the conversion of week numbers to dates.
- `panel_measure_definition.doc` defines panel measures. This file is slightly different for years 2008-2011.
- `Category_outlet_startweek_endweek` with no file extension contains store-week level data
- `Category_PANEL_outlet_startweek_endweek.dat` DAT file contains panel data at transaction level.

**TO DO** For years 1, 2, 6, 7, and 12, the `DEMOS.csv` files are located in the directory `demo trips external`. Move these into each YearXX directory.

Note that for 2001-2007, the outlet categories are:
- {DR: drug, GR: groceries, MA: mass}. 

For 2008-2011, the outlet categories are:
- {DK: drug, GK: groceries, MK: mass}.

### 2.1. Week numbers - `IRI WEEK Translation` file description:

- End date = (weekNumber - 400) * 7 + 31900
- Start date = (weekNumber - 400) * 7 + 31900 - 6

### 2.2. Sales data - `Category_outlet_startweek_endweek` file description:

The store data files are the largest files.

Both the store data and panel data files are keyed to the dimensional information (store, week, UPC fields, [panelist]).

Records within a file represent a transaction by store-week-upc (universal product code).

**Naming convention:** The naming convention for these is category name then outlet then start week and then end week, all separated by underscores, with no extension, so salted snacks drug data for the earliest year would be `saltsnck_drug_1114_1165`.

**Columns of interest:**

- IRI_Key FK: Delivery_Stores
- WEEK FK: IRI Week Translation.xls
- SY FK: UPC system code
- GW FK: UPC generation code
- VEND FK: UPC vendor code
- ITEM FK: UPC item code
- UNITS Units sold
- DOLLARS Amount (note below)

The dollars column reflects the retail price paid, on average, after retail features, displays and retail coupons. It does not include manufacturer coupons or any discount that might be applied by the retailer that is not applicable to the item. For example, if a retailer gave USD5 off if you purchased more than USD200, that discount is not applied. Sales taxes are not included.

The F column denotes whether there was a marketing feature within the store, such a small or large-sized ad. The D column denotes whether there was a marketing display of the product within the store.

### 2.3. `Delivery_Stores` file description:

The file contains each store "masked" using the sequence key as it's identifier across the various tables. This file also contains outlet, estimated acv, the market name so data can be aggregated by market, an open and close week, and finally a "chain" number representing a particular retailer. All the stores belonging to Chain8 are part of the same retailer that year.

**Columns of interest:**

- IRI_KEY: FK: masked store ID, **maybe different from year to year**. Cross-reference in Appendix 2 of the data dictionary. <- ignore this for now
- OU: drug/groceries/mass market -> into its own NF table EST_ACV: estimate of annualized sale in MILLIONS for the store across ALL categories
- Market_Name: 50 markets total -> into its own NF table

### 2.4. Product attributes (in directory "beer_attributes"):

`prod01_beer.xlsx` and prod01_beer_sz.xlsx for 2001-2006.<br>
`prod07_beer.xlsx` for 2007.<br>
`prod11_beer.xlsx` for 2008-2011.<br>
`prod12_beer` for 2012.

`prod01_beer_sz.xlsx` describes additional size attribute information. No size information was provided for 2007 onward.

**Columns of interest:**

- L2 Small category (domestic or import) -> into its own NF table
- L4 Vendor -> into its own NF table
- L5 Brand -> into its own NF table
- SY UPC system code
- GW UPC generation code
- VEND UPC vendor code
- ITEM UPC item code -> for each UPC item, generate a surrogate key
- VOL_EQ Volume equivalent := ounces / 192\. Denotes total beer per unit sold (e.g. total volume in bottle/can/4-pack/6-pack/case/keg) **TO DO:** figure out a way to determine volume of each individual package
- TYPE OF BEER/ALE **Admit PRODUCT TYPE if MISSING** -> into its own NF table
- PACKAGE Packaging (can/glass, single, box, carton, keg, etc...) -> into its own NF table
- FLAVOR/SCENT FLAVOR[FLAVOR = MISSING] <- NULL, -> into its own NF table

Note: Columns CALORIE LEVEL and COLOR have too many missing values to be useful for analysis.

### ~~2.5. Category_PANEL_outlet_startweek_endweek.dat~~:

Panel data is provided for two "BehaviorScan" markets, Eau Claire, Wisconsin and Pittsfield, Massachusetts.

Outside the scope of this project.

### ~~2.6. Panel trips~~:

These files represent the trips made by panelists who purchased at least one item.

Outside the scope of this project.

---


## 3. MySQL DDL

<img src= "beer_eer_diagram.png" width="700" />

---

## 4. Create schema in MySQL server

In MySQL Workbench, create the schema/EER the `beer` database on a local MySQL server using the DDL script `beer_ddl.sql`.

Note: When populating the database with data, it is **paramount** that the data for the tips of the snowflake is inputted first, and the fact tables (`sales`) and `econ` be inserted last. Otherwise, it would throw a `FOREIGN_KEY_CHECKS` error (ERROR 1452: Cannot add or update a child row: a foreign key constraint fails).

Note note: For each instance a MySQL table is populated, there will be a binary log generated. Since the SALES data is large, the corresponding binary log will also be large, taking up space in your storage up to 5GB at a time. To purge the binary logs, use the script `PURGE BINARY LOGS BEFORE '<DATETIME>';`.

---

## 5. Product data

### 5.1 Extraction of columns for clustering in OpenRefine

There are three columns from the `UPC` product table that require text clustering, namely, `flavor`, `packaging`, and `beer-type`.

The unique values in each of these three columns are extracted into three separate dataframes. In each dataframe, the column is duplicated with the following naming convention to preserve changes for future validation: `xxx_name` represent original names, `xxx_cat` represent categories post-clustering. These dataframes are stored as `.csv` files in the directory `"./OpenRefine_data/pre-openrefine/`. Files that have been clustered using OpenRefine are located in the directory `"./OpenRefine_data/pre-openrefine/` and have `-or` appended to their filenames. 

We first import the packages and settings required for the notebook:

In [41]:
import os
import glob
import numpy as np
import pandas as pd

from sqlalchemy import create_engine
from tqdm import tqdm
import time

from fredapi import Fred

# MySQL server credentials
engine = create_engine("mysql+pymysql://{user}:{pw}@localhost/{db}".format(user="root", pw="rootroot", db="beer"))

We read in the product tables:

In [57]:
prod_all_beer_df = pd.concat([pd.read_excel(f) for f in glob.glob("./IRI BEER DATASET/beer_attributes/prod*_beer.xls*")], ignore_index = True, sort=False)
print("The dimensions of the product tables combined is", prod_all_beer_df.shape)

The dimensions of the product tables combined is (56938, 23)


In [48]:
prod_all_beer_df["FLAVOR/SCENT"] = prod_all_beer_df["FLAVOR/SCENT"].replace("MISSING", "NO FLAVOR").replace("REGULAR", "NO FLAVOR")
prod_all_beer_df["PACKAGE"] = prod_all_beer_df["PACKAGE"].replace("MISSING", "UNKNOWN")
prod_all_beer_df["TYPE OF BEER/ALE"] = prod_all_beer_df["TYPE OF BEER/ALE"].replace("MISSING", "BEER")
prod_all_beer_df.head()

Unnamed: 0,L1,L2,L3,L4,L5,L9,Level,UPC,SY,GE,...,VOL_EQ,PRODUCT TYPE,TYPE OF BEER/ALE,PACKAGE,FLAVOR/SCENT,SIZE,CALORIE LEVEL,COLOR,*AG C=1+ CATEGORY 00004,*STUBSPEC 1416RC 00004
0,CATEGORY - BEER/ALE/ALCOHOLIC CID,DOMESTIC BEER/ALE (INC NON-ALCOH,ABC WINE & SPIRITS,ABC WINE & SPIRITS,ABC ALE,+ABCAL ALE BEER CAN 12OZ,9,00-01-85674-60002,0,1,...,0.0417,BEER,ALE,CAN,NO FLAVOR,MISSING,MISSING,MISSING,,
1,CATEGORY - BEER/ALE/ALCOHOLIC CID,DOMESTIC BEER/ALE (INC NON-ALCOH,ABC WINE & SPIRITS,ABC WINE & SPIRITS,ABC ALE,+ABCAL ALE BEER CAN 72OZ,9,00-01-85674-60001,0,1,...,0.25,BEER,ALE,CAN,NO FLAVOR,MISSING,MISSING,MISSING,,
2,CATEGORY - BEER/ALE/ALCOHOLIC CID,DOMESTIC BEER/ALE (INC NON-ALCOH,ABITA BREWING CO INC,ABITA BREWING CO INC,ABITA AMBER,+ABTAM LAGER BEER GB 12OZ,9,27-01-15502-01124,27,1,...,0.0417,BEER,LAGER,GLASS BOTTLE,NO FLAVOR,MISSING,MISSING,AMBER,,
3,CATEGORY - BEER/ALE/ALCOHOLIC CID,DOMESTIC BEER/ALE (INC NON-ALCOH,ABITA BREWING CO INC,ABITA BREWING CO INC,ABITA AMBER,+ABTAM LAGER BEER GB 12OZ,9,00-01-80020-00001,0,1,...,0.0417,BEER,LAGER,GLASS BOTTLE,NO FLAVOR,MISSING,MISSING,AMBER,,
4,CATEGORY - BEER/ALE/ALCOHOLIC CID,DOMESTIC BEER/ALE (INC NON-ALCOH,ABITA BREWING CO INC,ABITA BREWING CO INC,ABITA AMBER,+ABTAM LAGER BEER GBCRT 72OZ,9,00-01-80020-24221,0,1,...,0.25,BEER,LAGER,GLASS BOTTLE IN CRTN,NO FLAVOR,MISSING,MISSING,AMBER,,


In [49]:
# Unique flavors

flavor_df = prod_all_beer_df[["FLAVOR/SCENT"]].dropna().drop_duplicates()
flavor_df.rename(columns={"FLAVOR/SCENT": "flavor_name"}, inplace=True)
flavor_df["flavor_cat"] = flavor_df["flavor_name"]
flavor_df.reset_index(drop=True, inplace=True)
flavor_df.to_csv("./OpenRefine_data/pre-openrefine/flavor.csv", index=False)
flavor_df.head()

Unnamed: 0,flavor_name,flavor_cat
0,NO FLAVOR,NO FLAVOR
1,ASSORTED,ASSORTED
2,RASPBERRY,RASPBERRY
3,RUM,RUM
4,WATERMELON,WATERMELON


In [50]:
# Unique packaging

packaging_df = prod_all_beer_df[["PACKAGE"]].dropna().drop_duplicates()
packaging_df.rename(columns={"PACKAGE": "packaging_name"}, inplace=True)
packaging_df["packaging_cat"] = packaging_df["packaging_name"]
packaging_df.reset_index(drop=True, inplace=True)
packaging_df.to_csv("./OpenRefine_data/pre-openrefine/packaging.csv", index=False)
packaging_df.tail()

Unnamed: 0,packaging_name,packaging_cat
73,BOX,BOX
74,PLASTIC JUG,PLASTIC JUG
75,BOTTLE IN TOOLBOX,BOTTLE IN TOOLBOX
76,TALL CAN,TALL CAN
77,PLASTIC SECURED CAN,PLASTIC SECURED CAN


In [51]:
# Unique beer_type

beer_type_df = prod_all_beer_df[["TYPE OF BEER/ALE"]].dropna().drop_duplicates()
beer_type_df.rename(columns={"TYPE OF BEER/ALE": "beer_type_name"}, inplace=True)
beer_type_df["beer_type_cat"] = beer_type_df["beer_type_name"]
beer_type_df.reset_index(drop=True, inplace=True)
beer_type_df.to_csv("./OpenRefine_data/pre-openrefine/beer_type.csv", index=False)
beer_type_df.tail()

Unnamed: 0,beer_type_name,beer_type_cat
89,SCHWARZBIER,SCHWARZBIER
90,BANANA,BANANA
91,WHITE,WHITE
92,WHEAT LAGER,WHEAT LAGER
93,HEFEDUNKEL,HEFEDUNKEL


### 5.2. Normalisation

The clustered data on `beer-type`, `flavors`, and `packaging` are used to normalise the `UPC` product data. ID's of each unique category is added to each dataframe and dictionaries are created for future mapping to the `UPC` table:

In [52]:
flavor_df = pd.read_csv("./OpenRefine_data/post-openrefine/flavor-or.csv")
flavor_df = flavor_df.assign(flavor_id = flavor_df["flavor_cat"].astype('category').cat.codes).sort_values(by=['flavor_id'])
flavor_df.reset_index(drop=True, inplace=True)
flavor_dict = flavor_df.set_index("flavor_name")["flavor_id"].to_dict()

packaging_df = pd.read_csv("./OpenRefine_data/post-openrefine/packaging-or.csv")
packaging_df = packaging_df.assign(packaging_id = packaging_df["packaging_cat"].astype('category').cat.codes).sort_values(by=['packaging_id'])
packaging_df.reset_index(drop=True, inplace=True)
packaging_dict = packaging_df.set_index("packaging_name")["packaging_id"].to_dict()

beer_type_df = pd.read_csv("./OpenRefine_data/post-openrefine/beer_type-or.csv")
beer_type_df = beer_type_df.assign(beer_type_id = beer_type_df["beer_type_cat"].astype('category').cat.codes).sort_values(by=['beer_type_id'])
beer_type_df.reset_index(drop=True, inplace=True)
beer_type_dict = beer_type_df.set_index("beer_type_name")["beer_type_id"].to_dict()

Each vendor (brand) is also assigned a unique ID:

In [53]:
# Unique vendor

vendor_df = prod_all_beer_df[["L4"]].replace("MISSING", np.nan).replace("ALL OTHERS", np.nan).replace("PRIVATE LABEL", np.nan).dropna().drop_duplicates()
vendor_df.rename(columns={"L4": "vendor_name"}, inplace=True)
vendor_df["vendor_id"] = np.arange(1,1+len(vendor_df))
vendor_df.reset_index(drop=True, inplace=True)
vendor_dict = vendor_df.set_index("vendor_name")["vendor_id"].to_dict()

A dataframe of unique products (defined by UPC codes) is created.

In [54]:
prod_all_beer_unique_df = prod_all_beer_df.drop_duplicates(subset="UPC")

prod_all_beer_unique_df = prod_all_beer_unique_df[prod_all_beer_unique_df.L4 != "ALL OTHERS"]
prod_all_beer_unique_df = prod_all_beer_unique_df[prod_all_beer_unique_df.L4 != "PRIVATE LABEL"]
prod_all_beer_unique_df["domestic"] = [1 if x == "DOMESTIC BEER/ALE (INC NON-ALCOH" else 0 for x in prod_all_beer_unique_df["L2"]]
prod_all_beer_unique_df = prod_all_beer_unique_df[["UPC", "SY", "GE", "VEND", "ITEM", "domestic", "L4", "VOL_EQ", "TYPE OF BEER/ALE", "PACKAGE", "FLAVOR/SCENT"]]
prod_all_beer_unique_df.rename(columns={"L4": "vendor_id", "TYPE OF BEER/ALE": "beer_type_id", "PACKAGE": "packaging_id", "FLAVOR/SCENT": "flavor_id"}, inplace=True)

prod_all_beer_unique_df["vendor_id"] = prod_all_beer_unique_df["vendor_id"].map(vendor_dict)
prod_all_beer_unique_df["beer_type_id"] = prod_all_beer_unique_df["beer_type_id"].map(beer_type_dict)
prod_all_beer_unique_df["beer_type_id"] = prod_all_beer_unique_df["beer_type_id"].astype('Int64')
prod_all_beer_unique_df["packaging_id"] = prod_all_beer_unique_df["packaging_id"].map(packaging_dict)
prod_all_beer_unique_df["packaging_id"] = prod_all_beer_unique_df["packaging_id"].astype('Int64')
prod_all_beer_unique_df["flavor_id"] = prod_all_beer_unique_df["flavor_id"].map(flavor_dict)
prod_all_beer_unique_df["flavor_id"] = prod_all_beer_unique_df["flavor_id"].astype('Int64')

prod_all_beer_unique_df.reset_index(drop=True, inplace=True)
prod_all_beer_unique_df["UPC_id"] = np.arange(1,1+len(prod_all_beer_unique_df))
prod_all_beer_unique_df.reset_index(drop=True, inplace=True)
prod_all_beer_unique_df["total_vol_oz"] = (prod_all_beer_unique_df["VOL_EQ"]*192).round()
prod_all_beer_unique_df.tail()

Unnamed: 0,UPC,SY,GE,VEND,ITEM,domestic,vendor_id,VOL_EQ,beer_type_id,packaging_id,flavor_id,UPC_id,total_vol_oz
24308,00-01-82054-10990,0,1,82054,10990,0,512,0.0587,22,12,106,24309,11.0
24309,27-01-04200-03998,27,1,4200,3998,0,512,0.5868,22,18,106,24310,113.0
24310,00-02-08205-41044,0,2,8205,41044,0,512,0.25,22,13,106,24311,48.0
24311,06-01-13395-00007,6,1,13395,7,0,514,0.5868,7,18,106,24312,113.0
24312,00-01-82153-33109,0,1,82153,33109,0,516,0.0441,7,13,106,24313,8.0


UPC codes in `sales` tables are atomized with no leading zeros.  
Will need to normalise sales tables by replacing atomized UPC codes by surrogate UPC_id codes instead.  
Can't use UPC column in `prod_all_beer_unique_df` to make dictionary since values have leading zeros.  
Create df by concat-ing "SY", "GE", "VEND", "ITEM" columns with dash as separator.  
Create dict for use in Section `4.3`.  

In [14]:
atom_upc_upcid_df = prod_all_beer_unique_df[["UPC_id", "SY", "GE", "VEND", "ITEM"]]
concat_upc_atom = atom_upc_upcid_df[["SY", "GE", "VEND", "ITEM"]].apply(lambda row: '-'.join(row.values.astype(str)), axis=1)
atom_upc_upcid_df = pd.concat([atom_upc_upcid_df, concat_upc_atom], axis=1)
atom_upc_upcid_df.drop(["SY", "GE", "VEND", "ITEM"], axis = 1, inplace=True)
atom_upc_upcid_df.rename(columns = {0: "UPC_atom_concat"}, inplace=True)
atom_upc_upcid_dict = atom_upc_upcid_df.set_index("UPC_atom_concat")["UPC_id"].to_dict()

atom_upc_upcid_df.head()

Unnamed: 0,UPC_id,UPC_atom_concat
0,1,0-1-85674-60002
1,2,0-1-85674-60001
2,3,27-1-15502-1124
3,4,0-1-80020-1
4,5,0-1-80020-24221


### 5.3. Push to MySQL server

Before doing so, keep only unique categories and ID.

In [15]:
flavor_df.drop(["flavor_name"],axis=1, inplace=True)
flavor_df.drop_duplicates(inplace=True)
flavor_df.to_sql('flavor', con = engine, if_exists = 'append', chunksize = 1000, index = False)

packaging_df.drop(["packaging_name"],axis=1, inplace=True)
packaging_df.drop_duplicates(inplace=True)
packaging_df.to_sql('packaging', con = engine, if_exists = 'append', chunksize = 1000, index = False)

beer_type_df.drop(["beer_type_name"],axis=1, inplace=True)
beer_type_df.drop_duplicates(inplace=True)
beer_type_df.to_sql('beer_type', con = engine, if_exists = 'append', chunksize = 1000, index = False)

vendor_df.to_sql('vendor', con = engine, if_exists = 'append', chunksize = 1000, index = False)

prod_all_beer_unique_df.to_sql('upc', con = engine, if_exists = 'append', chunksize = 1000, index = False)

---

## 6. Store data

### 6.1 Import the store data

In [16]:
# Store (Delivery_Stores) tables

stores_all_df = pd.concat([pd.read_csv(f, sep="\t") for f in glob.glob("./IRI BEER DATASET/Year*/Delivery_Stores")], ignore_index = True, sort=False)
stores_all_df.columns = ["string"]
stores_all_df.head()

Unnamed: 0,string
0,200032 GR 28.11499 NEW YORK 1...
1,200059 GR 20.80499 PHILADELPHIA 1...
2,200171 GR 25.282 MILWAUKEE ...
3,200197 GR 16.616 PEORIA/SPRINGFLD. ...
4,200272 GR 10.91199 LOS ANGELES ...


The store data is vertically aligned but not tab-separated, so it is read into a dataframe as a string. Attributes are manually extracted:

In [17]:
# Split column by character location

stores_all_df["store_id"] = stores_all_df.string.str[0:7].astype(str).astype(int)
stores_all_df["outlet_cat_name"] = stores_all_df.string.str[8:10]
stores_all_df["market_name"] = stores_all_df.string.str[20:45]
stores_all_df["market_name"] = stores_all_df["market_name"].apply(lambda x: x.strip())

outlet_cat_convert_dict = {"DR": "drug", "GR": "groceries", "MA": "mass", "DK": "drug", "GK": "groceries", "MK": "mass"}
stores_all_df["outlet_cat_name"] = stores_all_df["outlet_cat_name"].map(outlet_cat_convert_dict)
stores_all_df.drop(["string"], axis = 1, inplace=True)
stores_all_df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
stores_all_df.drop_duplicates(subset="store_id", inplace=True)
stores_all_df.dtypes

store_id            int64
outlet_cat_name    object
market_name        object
dtype: object

In [18]:
stores_all_df.head()

Unnamed: 0,store_id,outlet_cat_name,market_name
0,200032,groceries,NEW YORK
1,200059,groceries,PHILADELPHIA
2,200171,groceries,MILWAUKEE
3,200197,groceries,PEORIA/SPRINGFLD.
4,200272,groceries,LOS ANGELES


### 6.2 Normalisation

Dictionaries for outlet_category (grocery store, drug store) and market (Los Angeles, Chicago, New York, etc) are created to normalise the outlet categories and markets. The market names are manually amended in a `.csv` such that it is readable by Tableau. 

In [19]:
# Unique outlet_cat

outlet_cat_df = stores_all_df[["outlet_cat_name"]].drop_duplicates()
outlet_cat_df["outlet_cat_id"] = np.arange(1,1+len(outlet_cat_df))
outlet_cat_df.reset_index(drop=True, inplace=True)
outlet_cat_dict = outlet_cat_df.set_index("outlet_cat_name")["outlet_cat_id"].to_dict()
outlet_cat_df.head()

Unnamed: 0,outlet_cat_name,outlet_cat_id
0,groceries,1
1,drug,2


In [20]:
# Unique market

market_df = stores_all_df[["market_name"]].drop_duplicates()
market_df["market_id"] = np.arange(1,1+len(market_df))
market_df.reset_index(drop=True, inplace=True)
market_df.to_csv("./OpenRefine_data/pre-openrefine/market.csv", index=False)
market_df.head()

Unnamed: 0,market_name,market_id
0,NEW YORK,1
1,PHILADELPHIA,2
2,MILWAUKEE,3
3,PEORIA/SPRINGFLD.,4
4,LOS ANGELES,5


In [21]:
market_df  = pd.read_csv("./OpenRefine_data/post-openrefine/market-or.csv")
market_dict = market_df.set_index("market_name")["market_id"].to_dict()

market_df.head()

Unnamed: 0,market_name,region,state,city,market_id
0,NEW YORK,,NEW YORK,NEW YORK CITY,1
1,PHILADELPHIA,,PENNSYLVANIA,PHILADELPHIA,2
2,MILWAUKEE,,WISCONSIN,MILWAUKEE,3
3,PEORIA/SPRINGFLD.,,ILLINOIS,PEORIA/SPRINGFLD.,4
4,LOS ANGELES,,CALIFORNIA,LOS ANGELES,5


In [22]:
stores_all_df["outlet_cat_name"] = stores_all_df["outlet_cat_name"].map(outlet_cat_dict)
stores_all_df["market_name"] = stores_all_df["market_name"].map(market_dict)
stores_all_df.rename(columns = {"outlet_cat_name": "outlet_cat_id", "market_name": "market_id"}, inplace=True)
stores_all_df.reset_index(drop=True, inplace=True)
stores_all_df.tail()

Unnamed: 0,store_id,outlet_cat_id,market_id
3579,933196,1,2
3580,8028829,2,3
3581,238337,1,33
3582,684577,1,2
3583,1086089,1,40


### 6.3. Push to MySQL server

In [23]:
outlet_cat_df.to_sql('outlet_cat', con = engine, if_exists = 'append', chunksize = 1000, index = False)
market_df.to_sql('market', con = engine, if_exists = 'append', chunksize = 1000, index = False)
stores_all_df.to_sql('store', con = engine, if_exists = 'append', chunksize = 1000, index = False)

---

## 7. Week table

Create a week table to convert IRI week codes to calendar date, refers to the Sunday of each week. 

In [42]:
week_index = pd.date_range(start='12/30/2000', end='01/01/2013', freq='W-MON')
week_df = week_index.to_frame(index=False)
week_df.columns = ["date"]
week_df["week_id"] = np.arange(1114,1114+len(week_df))
week_df.shape

(627, 2)

In [43]:
week_df.head()

Unnamed: 0,date,week_id
0,2001-01-01,1114
1,2001-01-08,1115
2,2001-01-15,1116
3,2001-01-22,1117
4,2001-01-29,1118


In [26]:
# Import into MySQL server

week_df.to_sql('week', con = engine, if_exists = 'append', chunksize = 1000, index = False)

---

## 8. Sales Data

Sales data are stored in `YearXX` directories with the naming convention: `beer_<OUTLET_CAT>_<START_WEEKID>_<END_WEEK_ID>`.

In [27]:
# List of all sales data and total size

sales_file_list = glob.glob("./IRI BEER DATASET/Year*/beer_????_????_????")

sales_files_size_GB = round(sum([os.stat(file).st_size for file in sales_file_list])/(1024**3),2)
print("There are", len(sales_file_list), "files and the total size of sales data is", sales_files_size_GB, "GB.")

There are 24 files and the total size of sales data is 6.93 GB.


Since files are big but each file has the same spacing format, for each sales file:
1. `read_table()` into dataframe
2. Split string into columns by character position (IRI_KEY, WEEK, SY, GE, VEND, ITEM, UNITS, DOLLARS)
3. Push to mySQL by `if_exists = 'append'` method

Estimated time for manipulating and importing sales data: 2.5 hours. System: macOS 10.15.1, Intel 7th gen Core i5 (I5-7267U), 8GB memory, Iris Plus Graphics 650.

In [40]:
sales_file_list[:6]

['./IRI BEER DATASET/Year9/beer_groc_1531_1582',
 './IRI BEER DATASET/Year9/beer_drug_1531_1582',
 './IRI BEER DATASET/Year7/beer_groc_1427_1478',
 './IRI BEER DATASET/Year7/beer_drug_1427_1478',
 './IRI BEER DATASET/Year1/beer_drug_1114_1165',
 './IRI BEER DATASET/Year1/beer_groc_1114_1165']

In [29]:
for series in tqdm(sales_file_list): 
    sales_each_df = pd.read_csv(series, sep="\t")
    sales_each_df.columns = ["string"]
    
    # split non-separated data (string) into columns
    sales_each_df["store_id"] = sales_each_df.string.str[0:7].astype(str).astype(int)
    sales_each_df["week_id"] = sales_each_df.string.str[8:12].astype(str).astype(int)
    sales_each_df["SY"] = sales_each_df.string.str[13:15].astype(str).astype(int)
    sales_each_df["GE"] = sales_each_df.string.str[16:18].astype(str).astype(int)
    sales_each_df["VEND"] = sales_each_df.string.str[19:24].astype(str).astype(int)
    sales_each_df["ITEM"] = sales_each_df.string.str[25:30].astype(str).astype(int)
    sales_each_df["UNITS"] = sales_each_df.string.str[31:36].astype(str).astype(int)
    sales_each_df["DOLLARS"] = sales_each_df.string.str[37:45].astype(str).astype(float)
    sales_each_df.drop(["string"], axis = 1, inplace=True)
    sales_each_df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    concat_upc_atom = sales_each_df[["SY", "GE", "VEND", "ITEM"]].apply(lambda row: '-'.join(row.values.astype(str)), axis=1)
    sales_each_df = pd.concat([sales_each_df, concat_upc_atom], axis=1)
    sales_each_df.drop(["SY", "GE", "VEND", "ITEM"], axis = 1, inplace=True)
    sales_each_df.rename(columns = {0: "UPC_atom_concat"}, inplace=True)
    sales_each_df["upc_id"] = sales_each_df["UPC_atom_concat"].map(atom_upc_upcid_dict)
    # sales_each_df[sales_each_df.isna().any(axis=1)]
    # UPC code 0-1-11170-83511 does not exist in any of the `prod*_beer.xls*` tables so it's being dropped
    sales_each_df.dropna(inplace = True)
    sales_each_df.drop(["UPC_atom_concat"], axis = 1, inplace=True)
    sales_each_df["upc_id"] = sales_each_df["upc_id"].astype('Int64')
    
    # dump into MySQL
    #sales_each_df.to_sql('sales', con = engine, if_exists = 'append', chunksize = 1000, index = False)
    sales_each_df.to_sql('sales', con = engine, if_exists = 'append', chunksize = 1000, index = False)

100%|██████████| 24/24 [2:22:32<00:00, 433.04s/it]


---

## 9. Economic data

FRED API documentation: [https://research.stlouisfed.org/docs/api/fred/ ]  

### 9.1 State abbreviations data

**[IMPORTANT]** Downloaded from: http://www.whypad.com/wp-content/uploads/us_states.zip and placed in notebook directory.

In [44]:
state_code_df = pd.read_csv("us_states.csv")
state_code_df.columns = ["STATE", "state", "state_abbrev"]
state_code_df.drop(["STATE"], axis = 1, inplace=True)
state_code_df.head()

Unnamed: 0,state,state_abbrev
0,Alaska,AK
1,Arizona,AZ
2,Arkansas,AR
3,California,CA
4,Colorado,CO


### 9.2. FRED economic data

Economic series considered:  

Real Gross Domestic Product: `GDPC1` ※  
US Unemployment Rate: `UNRATE` †  
State unemployment rate: `state_code` + `UR`; use state abbreviations dataframe from section `9.1` †  
CPI (for All Urban Consumers: All Items in U.S. City Average): `CPIAUCSL` †  
Long-Term Government Bond Yields: 10-year: Main (Including Benchmark): `IRLTLT01USM156N` †  
S&P/Case-Shiller U.S. National Home Price Index: `CSUSHPISA` †  
NBER based Recession Indicator: `USRECP` †  
Smoothed U.S. Recession Probabilities: `RECPROUSM156N` †  
University of Michigan: Consumer Sentiment: `UMCSENT` †  
University of Michigan: Inflation Expectation: `MICH` †  
OECD Indicator for the United States: `CSCICP03USM665S` †  
Brent Crude: `POILBREUSDM` †  
West Texas Intermediate: `DCOILWTICO` †††  
Gold: `GOLDAMGBD228NLBM` †††   
St. Louis Fed Financial Stress Index: `STLFSI` ††  
Effective Federal Funds Rate: `FEDFUNDS` †  

Note: quarterly ※, monthly †, weekly ††, daily †††

We form the list of economic series to pull from FRED:

In [67]:
us_unemploy_api = ["UNRATE"]
state_unemploy_api_list = state_code_df['state_abbrev'].astype(str) + "UR"
state_unemploy_api_list = state_unemploy_api_list.tolist()
all_unemploy_api = us_unemploy_api + state_unemploy_api_list

other_api_list = ["IRLTLT01USM156N", "CSUSHPISA", "GDPC1", "CPIAUCSL", "USRECP", "RECPROUSM156N", "UMCSENT", "MICH", "CSCICP03USM665S", "POILBREUSDM", "DCOILWTICO", "GOLDAMGBD228NLBM", "STLFSI", "FEDFUNDS"]

all_api = other_api_list + all_unemploy_api

Upon setting up the API credentials, we pull data from FRED and nest it in DICTIONARY with format `{series_id : series_data_df}`. A dictionary is much preferred to dynamically creating objects through a loop since they are unnecessary, hard to create (use exec or globals()), and I can't use them dynamically anyway. But if you really want to, use `globals()`.

Sometimes the API reaches 504 Gateway Timeout error and yells at you. Just keep trying:

In [97]:
api_key = "<REDACTED KEY>"
fred = Fred(api_key=api_key)

dict_series_values = {series: fred.get_series_latest_release(series).to_frame() for series in all_api}

We clean the dataframes and drop all observations that are not between 2001 and 2012:

In [98]:
for series, df in dict_series_values.items():
    df.columns = [series]
    df.reset_index(level=0, inplace=True)
    df.rename(columns={"index": "date"}, inplace = True)
    pd.to_datetime(df['date'], format = "%Y-%m-%d")
    dict_series_values[series] = df.loc[(df['date'] >= "2001-01-01") & (df['date'] <= "2012-12-31")]

# Merge all economic data in dataframe
econ_df = pd.concat([df.set_index('date') for (series, df) in dict_series_values.items()], axis=1, join='outer').reset_index()

# Resample with monthly average
econ_df_monthly = econ_df.set_index('date').resample('W').mean().ffill()
econ_df_monthly.head(10)

Unnamed: 0_level_0,IRLTLT01USM156N,CSUSHPISA,GDPC1,CPIAUCSL,USRECP,RECPROUSM156N,UMCSENT,MICH,CSCICP03USM665S,POILBREUSDM,...,SDUR,TNUR,TXUR,UTUR,VTUR,VAUR,WAUR,WVUR,WIUR,WYUR
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2001-01-07,5.16,109.851,13222.69,175.6,0.0,18.04,94.7,3.0,100.966186,25.64,...,2.7,3.9,4.1,3.8,3.0,2.3,5.6,5.3,3.9,3.8
2001-01-14,5.16,109.851,13222.69,175.6,0.0,18.04,94.7,3.0,100.966186,25.64,...,2.7,3.9,4.1,3.8,3.0,2.3,5.6,5.3,3.9,3.8
2001-01-21,5.16,109.851,13222.69,175.6,0.0,18.04,94.7,3.0,100.966186,25.64,...,2.7,3.9,4.1,3.8,3.0,2.3,5.6,5.3,3.9,3.8
2001-01-28,5.16,109.851,13222.69,175.6,0.0,18.04,94.7,3.0,100.966186,25.64,...,2.7,3.9,4.1,3.8,3.0,2.3,5.6,5.3,3.9,3.8
2001-02-04,5.1,110.504,13222.69,176.0,0.0,21.44,90.6,2.8,100.593116,27.41,...,2.8,3.9,4.2,3.9,3.0,2.4,5.7,5.2,4.0,3.8
2001-02-11,5.1,110.504,13222.69,176.0,0.0,21.44,90.6,2.8,100.593116,27.41,...,2.8,3.9,4.2,3.9,3.0,2.4,5.7,5.2,4.0,3.8
2001-02-18,5.1,110.504,13222.69,176.0,0.0,21.44,90.6,2.8,100.593116,27.41,...,2.8,3.9,4.2,3.9,3.0,2.4,5.7,5.2,4.0,3.8
2001-02-25,5.1,110.504,13222.69,176.0,0.0,21.44,90.6,2.8,100.593116,27.41,...,2.8,3.9,4.2,3.9,3.0,2.4,5.7,5.2,4.0,3.8
2001-03-04,4.89,111.111,13222.69,176.1,1.0,25.54,91.5,2.8,100.432111,24.4,...,2.9,4.0,4.3,4.0,3.1,2.6,5.8,5.1,4.2,3.8
2001-03-11,4.89,111.111,13222.69,176.1,1.0,25.54,91.5,2.8,100.432111,24.4,...,2.9,4.0,4.3,4.0,3.1,2.6,5.8,5.1,4.2,3.8


In [99]:
econ_df_monthly['date'] = econ_df_monthly.index
econ_df_monthly.reset_index(level=0, drop=True, inplace=True)
econ_df_monthly.head()

# Merge IRI week number ID's with economic dataframe 
econ_df_monthly = pd.merge_asof(econ_df_monthly, week_df, direction='nearest')
econ_df_monthly.rename(columns={"week_id": "econ_week_id"}, inplace = True)
econ_df_monthly.tail()

Unnamed: 0,IRLTLT01USM156N,CSUSHPISA,GDPC1,CPIAUCSL,USRECP,RECPROUSM156N,UMCSENT,MICH,CSCICP03USM665S,POILBREUSDM,...,TXUR,UTUR,VTUR,VAUR,WAUR,WVUR,WIUR,WYUR,date,econ_week_id
622,1.72,145.506,16239.138,231.221,0.0,0.08,72.9,3.2,98.813582,109.64,...,6.5,5.0,4.6,5.8,7.4,7.4,6.9,5.1,2012-12-09,1737
623,1.72,145.506,16239.138,231.221,0.0,0.08,72.9,3.2,98.813582,109.64,...,6.5,5.0,4.6,5.8,7.4,7.4,6.9,5.1,2012-12-16,1738
624,1.72,145.506,16239.138,231.221,0.0,0.08,72.9,3.2,98.813582,109.64,...,6.5,5.0,4.6,5.8,7.4,7.4,6.9,5.1,2012-12-23,1739
625,1.72,145.506,16239.138,231.221,0.0,0.08,72.9,3.2,98.813582,109.64,...,6.5,5.0,4.6,5.8,7.4,7.4,6.9,5.1,2012-12-30,1740
626,1.72,145.506,16239.138,231.221,0.0,0.08,72.9,3.2,98.813582,109.64,...,6.5,5.0,4.6,5.8,7.4,7.4,6.9,5.1,2013-01-06,1740


In [100]:
# Import into MySQL server

econ_df_monthly.to_sql('econ', con = engine, if_exists = 'replace', chunksize = 1000, index = False)

---

## 10. Integration with Google Cloud Platforms and Google Cloud SQL Server

1. Create GCP project: `depa-final-project-beer-2019`. ID: `directed-smoke-258516`.

2. Create GCP VM instance: `beer-vm`. Allows full acces to all Cloud APIs. apt-get installed `git`.

3. Create bucket: `depa-bucket-of-white-claws`. With `uniform` access control to ensure uniform access to all objects in bucket using bucket-level permissions. 

4. Create MySQL Cloud instance: `depa-cloud-beer`. No IP's of personal machines added as "Authorized network". Options: vCPUs=4, Memory=15GB, SSD storage=10 GB flexible, MySQL==5.7. 

5. Create empty database in Cloud: `beer`.

6. Create database dump file with `mysqldump`. The dump file should be 4.81 GB in size:

In [93]:
!mysqldump --databases beer -h 127.0.0.1 -uroot -p --hex-blob --skip-triggers --single-transaction --set-gtid-purged=OFF --default-character-set=utf8mb4 > beer_dump.sql



7. Import dump file into `beer-cloud-sql.beer`. The full process will take approximately **one hour**. There will be an "unknown error" in your Notifications center about 20 minutes into the import process, but the circle to the left of the name of the instance is still spinning. It is actually not an error but a warning, and it does not imply that the import has failed. The the warning message is `<Use unique_checks=0 which is non deterministic>`. View the log file here: [https://console.cloud.google.com/logs/]. 

Note: mysqldump was preferred over direct migration through MySQL Workbench due to internet speed limitation. The `beer` database has a size of 17 GB, whilst the dump file has a size of 3 GB. I am trading off the time required for migration and upload with the time required to reindex and rebuild the database on GCP. 


--- 

## 11. Dataviz

This project uses Tableau as dataviz. Upon granting permissions by registering public IP addresses, personal machines would be able to connect to the Cloud SQL instance `cloud-sql-beer` through Tableau, Python, and MySQL. Currently the owner pays for the cost of data requests. In the future, this will admit a shared-cost model for each user/collaborator. 


---

## 12. Google Cloud Computing (progressus ad infinitum)

With the goal of expediting the data manipulation process, we turn to Google Cloud Platforms, on which we create a virtual machine with 4 vCPU cores and 15 GB memory. The raw IRI data, in addition to the data files after processed by OpenRefine/Excel, is hosted in a GCP Bucket. 

I wrote a shell script `gcp_vm_init.sh` which (i) installs the necessary Linux software, `pip`, and `Python3 packages`, (ii) pulls the respository from GitHub, with contains the OpenRefine files, Python scripts, and shell scripts, as well as (iii) downloads the raw IRI data from a GCP bucket where it is hosted. The script `gcp_finalProjectNotebook_2.py` takes the IRI dataset and the OpenRefine files as inputs, manipulates the data, and pushes it onto Google Cloud SQL server. 

--- 
## 99. Convert this notebook into `.py`

In [101]:
!jupyter nbconvert --to script finalProjectNotebook.ipynb

[NbConvertApp] Converting notebook finalProjectNotebook.ipynb to script
[NbConvertApp] Writing 32995 bytes to finalProjectNotebook.py


---
#### End of this notebook