# Data Lake for Liquor Sales in Iowa, US
### Data Engineering Capstone Project

### Exploratory Data Analysis on Staging Files

* **All data used in this notebook is a sample of the full dataset. The full dataset will be placed on an S3 bucket as it is huge (~9 million rows). The sample datasets are found in the data subfolder in this project**

In [1]:
# Do all imports and installs here
import pandas as pd
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func

### 1. Liquor Sales Dataset

* Location of sample data: /data/liquor_sales/Iowa_Liquor_Sales_sample.csv
* Location of full data: S3 bucket

In [127]:
# Read in the data here
df = pd.read_csv("data/liquor_sales/Iowa_Liquor_Sales_sample.csv")

#### Shape of dataset

In [123]:
df.shape

(318098, 24)

#### Columns and Data Types

Findings: Date column needs to be parsed as Date

In [124]:
df.dtypes

Invoice/Item Number       object
Date                      object
Store Number               int64
Store Name                object
Address                   object
City                      object
Zip Code                 float64
Store Location            object
County Number            float64
County                    object
Category                 float64
Category Name             object
Vendor Number            float64
Vendor Name               object
Item Number                int64
Item Description          object
Pack                       int64
Bottle Volume (ml)         int64
State Bottle Cost        float64
State Bottle Retail      float64
Bottles Sold               int64
Sale (Dollars)           float64
Volume Sold (Liters)     float64
Volume Sold (Gallons)    float64
dtype: object

#### Sample Data

Findings:
1. Store Location is in the format (longitude, latitude); needs to be split
2. Column names need to be transformed into lowercase and with spaces replaced by _

In [125]:
df.head()

Unnamed: 0,Invoice/Item Number,Date,Store Number,Store Name,Address,City,Zip Code,Store Location,County Number,County,...,Item Number,Item Description,Pack,Bottle Volume (ml),State Bottle Cost,State Bottle Retail,Bottles Sold,Sale (Dollars),Volume Sold (Liters),Volume Sold (Gallons)
0,INV-23056100024,2019-11-06,2500,Hy-Vee Food Store #1 / Ames,3800 W Lincoln Way,Ames,50010.0,,85.0,STORY,...,11773,Black Velvet,48,200,1.56,2.34,6,14.04,1.2,0.31
1,INV-22949500007,2019-11-01,5694,Flashmart #104,1704 S. Story Street,Boone,50036.0,POINT (-93.879597 42.037102),8.0,BOONE,...,48105,Hennessy VS,12,375,10.74,16.11,4,64.44,1.5,0.39
2,INV-22961300004,2019-11-01,5072,Pep Stop,"901, E Washington St",Mount Pleasant,52641.0,POINT (-91.541171 40.964231),44.0,HENRY,...,87643,Familia Camarena Silver,12,750,12.5,18.75,12,225.0,9.0,2.37
3,INV-22969800183,2019-11-01,2560,Hy-Vee Food Store / Marion,3600 Business Hwy 151 East,Marion,52302.0,,57.0,LINN,...,74740,Slow and Low,6,750,12.17,18.26,6,118.56,4.5,1.18
4,INV-22975100014,2019-11-01,2524,Hy-Vee Food Store / Dubuque,3500 Dodge St,Dubuque,52001.0,,31.0,DUBUQUE,...,11299,Crown Royal,44,200,5.0,7.5,6,45.0,1.2,0.31


#### Check for Missing Values in Important Columns

Findings:
1. Store Location has many null values; can be left as it is
2. Category Name has null values; can be left as it is

In [7]:
df.info(null_counts = True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 318098 entries, 0 to 318097
Data columns (total 24 columns):
Invoice/Item Number      318098 non-null object
Date                     318098 non-null object
Store Number             318098 non-null int64
Store Name               318098 non-null object
Address                  318047 non-null object
City                     318047 non-null object
Zip Code                 318047 non-null float64
Store Location           289993 non-null object
County Number            318047 non-null float64
County                   318047 non-null object
Category                 317607 non-null float64
Category Name            317607 non-null object
Vendor Number            318097 non-null float64
Vendor Name              318097 non-null object
Item Number              318098 non-null int64
Item Description         318098 non-null object
Pack                     318098 non-null int64
Bottle Volume (ml)       318098 non-null int64
State Bottle Cost        

#### Statistics on Numeric Columns

Findings:
1. Data looks fairly clean
2. No outliers in the fields of important metrics (e.g sale)

In [8]:
df.describe()

Unnamed: 0,Store Number,Zip Code,County Number,Category,Vendor Number,Item Number,Pack,Bottle Volume (ml),State Bottle Cost,State Bottle Retail,Bottles Sold,Sale (Dollars),Volume Sold (Liters),Volume Sold (Gallons)
count,318098.0,318047.0,318047.0,317607.0,318097.0,318098.0,318098.0,318098.0,318098.0,318098.0,318098.0,318098.0,318098.0,318098.0
mean,3896.399349,51260.268363,57.415608,1062031.0,267.637412,49796.019758,12.340936,831.715723,10.53039,15.799934,11.66349,154.393225,9.367886,2.46925
std,1147.152595,986.983704,27.335788,122740.4,138.049481,75652.18968,8.078051,520.280153,9.325243,14.001589,31.985178,502.89275,38.97711,10.296882
min,2106.0,50002.0,1.0,1011100.0,35.0,159.0,1.0,20.0,0.89,1.34,1.0,1.34,0.02,0.0
25%,2624.0,50316.0,32.0,1012100.0,130.0,26823.0,6.0,375.0,5.54,8.31,3.0,35.5,1.5,0.39
50%,3934.0,51101.0,62.0,1031200.0,260.0,38177.0,12.0,750.0,8.26,12.38,6.0,78.66,4.5,1.18
75%,4969.0,52302.0,77.0,1062500.0,389.0,64866.0,12.0,1000.0,13.0,19.5,12.0,150.96,10.5,2.77
max,9042.0,57222.0,99.0,1901200.0,978.0,999292.0,48.0,6000.0,1749.12,2623.68,3780.0,43885.8,6615.0,1747.49


#### Check on Unique Values in Categorical Fields

Findings:

Category Name:
1. 'Cocktails / RTD', 'Cocktails /RTD' are the category; need to standardise
2. 'American Cordials & Liqueur', 'American Cordials & Liqueurs' are the same; need to standardise
3. 'American Vodka', 'American Vodkas' are the same; need to standardise
4. 'Imported Cordials & Liqueur', 'Imported Cordials & Liqueurs'; need to standardise
5. 'Imported Distilled Spirit Specialty', 'Imported Distilled Spirits Specialty' are the same; need to standardise
6. 'Imported Vodka', 'Imported Vodkas' are the same; need to standardise
7. 'Temporary &  Specialty Packages', 'Temporary & Specialty Packages' are the same; need to standardise

County:
1. Need to make all into Proper Case

City:
1. Data is clean

In [14]:
df['Category Name'].sort_values().unique().tolist()

['100% Agave Tequila',
 'Aged Dark Rum',
 'American Brandies',
 'American Cordials & Liqueur',
 'American Cordials & Liqueurs',
 'American Distilled Spirit Specialty',
 'American Dry Gins',
 'American Flavored Vodka',
 'American Schnapps',
 'American Sloe Gins',
 'American Vodka',
 'American Vodkas',
 'Blended Whiskies',
 'Bottled in Bond Bourbon',
 'Canadian Whiskies',
 'Cocktails / RTD',
 'Cocktails /RTD',
 'Coffee Liqueurs',
 'Corn Whiskies',
 'Cream Liqueurs',
 'Flavored Gin',
 'Flavored Rum',
 'Gold Rum',
 'Imported Brandies',
 'Imported Cordials & Liqueur',
 'Imported Cordials & Liqueurs',
 'Imported Distilled Spirit Specialty',
 'Imported Distilled Spirits Specialty',
 'Imported Dry Gins',
 'Imported Flavored Vodka',
 'Imported Schnapps',
 'Imported Vodka',
 'Imported Vodkas',
 'Iowa Distillery Whiskies',
 'Irish Whiskies',
 'Mezcal',
 'Mixto Tequila',
 'Neutral Grain Spirits',
 'Neutral Grain Spirits Flavored',
 'Scotch Whiskies',
 'Single Barrel Bourbon Whiskies',
 'Single Mal

In [44]:
df['County'].sort_values().unique().tolist()

['ADAIR',
 'ADAMS',
 'ALLAMAKEE',
 'APPANOOSE',
 'AUDUBON',
 'BENTON',
 'BLACK HAWK',
 'BOONE',
 'BREMER',
 'BUCHANAN',
 'BUENA VIST',
 'BUTLER',
 'Black Hawk',
 'CALHOUN',
 'CARROLL',
 'CASS',
 'CEDAR',
 'CERRO GORD',
 'CHEROKEE',
 'CHICKASAW',
 'CLARKE',
 'CLAY',
 'CLAYTON',
 'CLINTON',
 'CRAWFORD',
 'DALLAS',
 'DAVIS',
 'DECATUR',
 'DELAWARE',
 'DES MOINES',
 'DICKINSON',
 'DUBUQUE',
 'Dallas',
 'Delaware',
 'Des Moines',
 'EMMET',
 'FAYETTE',
 'FLOYD',
 'FRANKLIN',
 'FREMONT',
 'GREENE',
 'GRUNDY',
 'GUTHRIE',
 'HAMILTON',
 'HANCOCK',
 'HARDIN',
 'HARRISON',
 'HENRY',
 'HOWARD',
 'HUMBOLDT',
 'Hancock',
 'Hardin',
 'Henry',
 'IDA',
 'IOWA',
 'Iowa',
 'JACKSON',
 'JASPER',
 'JEFFERSON',
 'JOHNSON',
 'JONES',
 'Jackson',
 'KEOKUK',
 'KOSSUTH',
 'LEE',
 'LINN',
 'LOUISA',
 'LUCAS',
 'LYON',
 'Linn',
 'MADISON',
 'MAHASKA',
 'MARION',
 'MARSHALL',
 'MILLS',
 'MITCHELL',
 'MONONA',
 'MONROE',
 'MONTGOMERY',
 'MUSCATINE',
 'Madison',
 'Marion',
 'Marshall',
 'OBRIEN',
 'OSCEOLA',
 'PAGE'

In [45]:
df['City'].sort_values().unique().tolist()

['Ackley',
 'Adair',
 'Adel',
 'Afton',
 'Akron',
 'Albert City',
 'Albia',
 'Albion',
 'Alburnett',
 'Alden',
 'Algona',
 'Allison',
 'Alta',
 'Alton',
 'Altoona',
 'Amana',
 'Ames',
 'Anamosa',
 'Anita',
 'Ankeny',
 'Anthon',
 'Aplington',
 'Arlington',
 'Armstrong',
 'Arnolds Park',
 'Atkins',
 'Atlantic',
 'Audubon',
 'Aurelia',
 'Avoca',
 'Baldwin',
 'Bancroft',
 'Baxter',
 'Bedford',
 'Belle Plaine',
 'Bellevue',
 'Belmond',
 'Bettendorf',
 'Bevington',
 'Blairstown',
 'Bloomfield',
 'Blue Grass',
 'Bondurant',
 'Boone',
 'Britt',
 'Brooklyn',
 'Buffalo',
 'Buffalo Center',
 'Burlington',
 'Calmar',
 'Camanche',
 'Carlisle',
 'Carroll',
 'Carter Lake',
 'Cascade',
 'Cedar Falls',
 'Cedar Rapids',
 'Center Point',
 'Centerville',
 'Central City',
 'Chariton',
 'Charles City',
 'Cherokee',
 'Clarence',
 'Clarinda',
 'Clarion',
 'Clarksville',
 'Clear Lake',
 'ClearLake',
 'Clermont',
 'Clinton',
 'Clive',
 'Coggon',
 'Colfax',
 'Colo',
 'Columbus Junction',
 'Conrad',
 'Coon Rapids

-----------------------------------------------------------------

### 2. Holidays Dataset

* Location of sample data: /data/holidays/usholidays.json
* Location of full data: S3 bucket

In [16]:
# Read in the data here
df_holidays = pd.read_json("data/holidays/usholidays.json")

#### Shape of dataset

In [17]:
df_holidays.shape

(485, 2)

#### Columns and Data Types

Findings:
1. Column types are clean

In [19]:
df_holidays.dtypes

Date       datetime64[ns]
Holiday            object
dtype: object

#### Sample Data

Findings:
1. Column names need to be transformed into lowercase

In [20]:
df_holidays.head()

Unnamed: 0,Date,Holiday
0,2010-12-31,New Year's Day
1,2011-01-17,"Birthday of Martin Luther King, Jr."
2,2011-02-21,Washington's Birthday
3,2011-05-30,Memorial Day
4,2011-04-07,Independence Day


#### Check for Missing Values in Important Columns

Findings:
1. No null values; dataset is clean

In [21]:
df_holidays.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 485 entries, 0 to 484
Data columns (total 2 columns):
Date       485 non-null datetime64[ns]
Holiday    485 non-null object
dtypes: datetime64[ns](1), object(1)
memory usage: 7.7+ KB


#### Check on Unique Values in Categorical Fields

Findings:
1. While there are duplicate values due to the way it is quoted this is not a problem

In [22]:
df_holidays['Holiday'].sort_values().unique().tolist()

['Birthday of Martin Luther King, Jr.',
 'Christmas Day',
 'Columbus Day',
 'Independence Day',
 'Labor Day',
 'Martin Luther King, Jr. Day',
 'Memorial Day',
 "New Year's Day",
 'New Year’s Day',
 'Thanksgiving Day',
 'Veterans Day',
 "Washington's Birthday",
 'Washington’s Birthday']

___________________________________________________

### 3. Weather Dataset

* Location of sample data: /data/weather/*.csv
* Location of full data: S3 bucket

In [61]:
# Read in the data here
all_files = glob.glob("data/weather/*.csv")

li = []

for filename in all_files:
    df_weather = pd.read_csv(filename)
    li.append(df_weather)

df_weather = pd.concat(li, axis=0, ignore_index=True)

#### Shape of dataset

In [62]:
df_weather.shape

(4752, 7)

#### Columns and Data Types

Findings:
1. Column types are clean

In [82]:
df_weather.dtypes

County                    object
State                     object
Average Temperature      float64
Latitude (generated)     float64
Longitude (generated)    float64
Year                       int64
Month                      int64
dtype: object

#### Sample Data

Findings:
1. Column names need to be transformed into lowercase and spaces or brackets to be replaced with _

In [83]:
df_weather.head()

Unnamed: 0,County,State,Average Temperature,Latitude (generated),Longitude (generated),Year,Month
0,Wright,Iowa,22.0,42.7343,-93.7351,2017,12
1,Worth,Iowa,18.44,43.3787,-93.2608,2017,12
2,Woodbury,Iowa,23.52,42.3866,-96.0407,2017,12
3,Winneshiek,Iowa,22.98,43.2932,-91.8426,2017,12
4,Winnebago,Iowa,20.29,43.3803,-93.7342,2017,12


#### Statistics on Numeric Columns

Findings:
1. Data looks clean
2. No outliers in the fields of important metrics (e.g sale)

In [88]:
df_weather.describe()

Unnamed: 0,Average Temperature,Latitude (generated),Longitude (generated),Year,Month
count,3564.0,3564.0,3564.0,3564.0,3564.0
mean,49.198751,42.02832,-93.467433,2017.0,6.5
std,19.339754,0.845857,1.552874,0.816611,3.452537
min,10.06,40.5953,-96.2455,2016.0,1.0
25%,31.3625,41.3323,-94.7,2016.0,3.75
50%,50.4,42.0384,-93.4652,2017.0,6.5
75%,68.7325,42.7359,-92.1791,2018.0,9.25
max,79.19,43.3805,-90.5324,2018.0,12.0


#### Check for Missing Values in Important Columns

Findings:
1. No null values; dataset is clean

In [85]:
df_weather.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3564 entries, 0 to 3563
Data columns (total 7 columns):
County                   3564 non-null object
State                    3564 non-null object
Average Temperature      3564 non-null float64
Latitude (generated)     3564 non-null float64
Longitude (generated)    3564 non-null float64
Year                     3564 non-null int64
Month                    3564 non-null int64
dtypes: float64(3), int64(2), object(2)
memory usage: 195.0+ KB


#### Check on Unique Values in Categorical Fields

Findings:
1. Data is clean

In [86]:
df_weather['County'].sort_values().unique().tolist()

['Adair',
 'Adams',
 'Allamakee',
 'Appanoose',
 'Audubon',
 'Benton',
 'Black Hawk',
 'Boone',
 'Bremer',
 'Buchanan',
 'Buena Vista',
 'Butler',
 'Calhoun',
 'Carroll',
 'Cass',
 'Cedar',
 'Cerro Gordo',
 'Cherokee',
 'Chickasaw',
 'Clarke',
 'Clay',
 'Clayton',
 'Clinton',
 'Crawford',
 'Dallas',
 'Davis',
 'Decatur',
 'Delaware',
 'Des Moines',
 'Dickinson',
 'Dubuque',
 'Emmet',
 'Fayette',
 'Floyd',
 'Franklin',
 'Fremont',
 'Greene',
 'Grundy',
 'Guthrie',
 'Hamilton',
 'Hancock',
 'Hardin',
 'Harrison',
 'Henry',
 'Howard',
 'Humboldt',
 'Ida',
 'Iowa',
 'Jackson',
 'Jasper',
 'Jefferson',
 'Johnson',
 'Jones',
 'Keokuk',
 'Kossuth',
 'Lee',
 'Linn',
 'Louisa',
 'Lucas',
 'Lyon',
 'Madison',
 'Mahaska',
 'Marion',
 'Marshall',
 'Mills',
 'Mitchell',
 'Monona',
 'Monroe',
 'Montgomery',
 'Muscatine',
 'Obrien',
 'Osceola',
 'Page',
 'Palo Alto',
 'Plymouth',
 'Pocahontas',
 'Polk',
 'Pottawattamie',
 'Poweshiek',
 'Ringgold',
 'Sac',
 'Scott',
 'Shelby',
 'Sioux',
 'Story',
 'Ta

In [87]:
df_weather['State'].sort_values().unique().tolist()

['Iowa']

---------

### Data Cleansing and Transformation Steps using Spark

In [2]:
#initiate spark session
spark = SparkSession.builder.getOrCreate()

#### 1. Cleanse Liquor Sales Dataset

In [7]:
#define schema for liquor sales file and also convert column names accordingly on definition

liquor_sales_schema = StructType(
    [
        StructField('invoice_number', StringType(), False),
        StructField('sales_date', DateType(), True),
        StructField('store_number', DoubleType(), True),
        StructField('store_name', StringType(), True),
        StructField('address', StringType(), True),
        StructField('city', StringType(), True),
        StructField('zipcode', IntegerType(), True),
        StructField('store_location', StringType(), True),
        StructField('county_number', DoubleType(), True),
        StructField('county', StringType(), True),
        StructField('category', DoubleType(), True),
        StructField('category_name', StringType(), True),
        StructField('vendor_number', DoubleType(), True),
        StructField('vendor_name', StringType(), True),
        StructField('item_number', DoubleType(), True),
        StructField('description', StringType(), True),
        StructField('pack', IntegerType(), True),
        StructField('bottle_volume', IntegerType(), True),
        StructField('item_cost_price', DecimalType(10,2), True),
        StructField('item_retail_price', DecimalType(10,2), True),
        StructField('bottles_sold', IntegerType(), True),
        StructField('sales_usd', DecimalType(10,2), True),
        StructField('volume_sold_litres', DecimalType(10,2), True),
        StructField('volume_sold_gallons', DecimalType(10,2), True)
    ]
)

In [8]:
#read data in; data file should not have headers
df_spark = spark\
            .read\
            .option("header","false")\
            .schema(liquor_sales_schema)\
            .option("dateFormat", "dd/MM/yy")\
            .csv("file:///home/workspace/data/liquor_sales/Iowa_Liquor_Sales_sample_noheaders.csv")

df_spark.show(2)

+---------------+----------+------------+--------------------+--------------------+-----+-------+--------------------+-------------+------+---------+-----------------+-------------+------------------+-----------+------------+----+-------------+---------------+-----------------+------------+---------+------------------+-------------------+
| invoice_number|sales_date|store_number|          store_name|             address| city|zipcode|      store_location|county_number|county| category|    category_name|vendor_number|       vendor_name|item_number| description|pack|bottle_volume|item_cost_price|item_retail_price|bottles_sold|sales_usd|volume_sold_litres|volume_sold_gallons|
+---------------+----------+------------+--------------------+--------------------+-----+-------+--------------------+-------------+------+---------+-----------------+-------------+------------------+-----------+------------+----+-------------+---------------+-----------------+------------+---------+-----------------

In [9]:
# split Store Location into latitude and logitude
split_col = func.split(df_spark['store_location'], " ")
df_spark = df_spark.withColumn('latitude', func.regexp_replace(split_col.getItem(2), "\)", ''))
df_spark = df_spark.withColumn('longitude', func.regexp_replace(split_col.getItem(1), "\(", ''))
df_spark.show(2)

+---------------+----------+------------+--------------------+--------------------+-----+-------+--------------------+-------------+------+---------+-----------------+-------------+------------------+-----------+------------+----+-------------+---------------+-----------------+------------+---------+------------------+-------------------+---------+----------+
| invoice_number|sales_date|store_number|          store_name|             address| city|zipcode|      store_location|county_number|county| category|    category_name|vendor_number|       vendor_name|item_number| description|pack|bottle_volume|item_cost_price|item_retail_price|bottles_sold|sales_usd|volume_sold_litres|volume_sold_gallons| latitude| longitude|
+---------------+----------+------------+--------------------+--------------------+-----+-------+--------------------+-------------+------+---------+-----------------+-------------+------------------+-----------+------------+----+-------------+---------------+----------------

In [10]:
#fix values in categorical name field
df_spark = df_spark.withColumn('category_name', 
                               func.when(df_spark['category_name'] == "Cocktails /RTD" , "Cocktails / RTD")
                               .when(df_spark['category_name'] == "American Cordials & Liqueur" , "American Cordials & Liqueurs")
                               .when(df_spark['category_name'] == "American Vodkas" , "American Vodka")
                               .when(df_spark['category_name'] == "Imported Cordials & Liqueur" , "Imported Cordials & Liqueurs")
                               .when(df_spark['category_name'] == "Imported Distilled Spirits Specialty" , "Imported Distilled Spirit Specialty")
                               .when(df_spark['category_name'] == "Imported Vodkas" , "Imported Vodka")
                               .when(df_spark['category_name'] == "Temporary &  Specialty Packages" , "Temporary & Specialty Packages")
                               .otherwise(df_spark['category_name']))

#convert county into proper case
df_spark = df_spark.withColumn('county', func.initcap('county'))

#### 2. Cleanse Holidays Dataset

In [11]:
#read data in
df_holidays_spark = spark.read\
                    .option("multiline","true")\
                    .json("file:///home/workspace/data/holidays/usholidays.json")

df_holidays_spark.printSchema()
df_holidays_spark.show(2)

root
 |-- Date: string (nullable = true)
 |-- Holiday: string (nullable = true)

+--------+--------------------+
|    Date|             Holiday|
+--------+--------------------+
|31/12/10|      New Year's Day|
| 17/1/11|Birthday of Marti...|
+--------+--------------------+
only showing top 2 rows



In [12]:
#cast Date field as date format
df_holidays_spark = df_holidays_spark.withColumn("Date", func.to_date("Date", "dd/MM/yy"))
df_holidays_spark.show(2)

+----------+--------------------+
|      Date|             Holiday|
+----------+--------------------+
|2010-12-31|      New Year's Day|
|2011-01-17|Birthday of Marti...|
+----------+--------------------+
only showing top 2 rows



In [13]:
#rename columns to lowercase
df_holidays_spark = df_holidays_spark.withColumnRenamed('Date','date')
df_holidays_spark = df_holidays_spark.withColumnRenamed('Holiday','holiday_name')
df_holidays_spark.printSchema()

root
 |-- date: date (nullable = true)
 |-- holiday_name: string (nullable = true)



#### 3. Cleanse Weather Dataset

In [14]:
#read data in
df_weather_spark = spark.read\
                    .option("header","true")\
                    .option("inferSchema", "true")\
                    .csv("file:///home/workspace/data/weather")

df_weather_spark.printSchema()
df_weather_spark.show(5)

root
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Average Temperature: double (nullable = true)
 |-- Latitude (generated): double (nullable = true)
 |-- Longitude (generated): double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)

+----------+-----+-------------------+--------------------+---------------------+----+-----+
|    County|State|Average Temperature|Latitude (generated)|Longitude (generated)|Year|Month|
+----------+-----+-------------------+--------------------+---------------------+----+-----+
|    Wright| Iowa|              46.68|             42.7343|             -93.7351|2019|   10|
|     Worth| Iowa|              43.18|             43.3787|             -93.2608|2019|   10|
|  Woodbury| Iowa|              47.18|             42.3866|             -96.0407|2019|   10|
|Winneshiek| Iowa|              48.65|             43.2932|             -91.8426|2019|   10|
| Winnebago| Iowa|              44.55

In [15]:
#rename columns
df_weather_spark = df_weather_spark.withColumnRenamed('County','county')
df_weather_spark = df_weather_spark.withColumnRenamed('State','state')
df_weather_spark = df_weather_spark.withColumnRenamed('Average Temperature','climate_temp')
df_weather_spark = df_weather_spark.withColumnRenamed('Latitude (generated)','latitude_generated')
df_weather_spark = df_weather_spark.withColumnRenamed('Longitude (generated)','longitude_generated')
df_weather_spark = df_weather_spark.withColumnRenamed('Year','year')
df_weather_spark = df_weather_spark.withColumnRenamed('Month','month')

df_weather_spark.printSchema()

root
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- climate_temp: double (nullable = true)
 |-- latitude_generated: double (nullable = true)
 |-- longitude_generated: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



#### 4. Transformations into Fact and Dimension Table

In [16]:
df_spark.createOrReplaceTempView("df_spark")
df_holidays_spark.createOrReplaceTempView("df_holidays_spark")
df_weather_spark.createOrReplaceTempView("df_weather_spark")

#items dimension table
items = spark.sql("""
    SELECT DISTINCT
            item_number,
            description,
            category_name,
            bottle_volume,
            pack
    FROM df_spark
""")

print("items")
items.show(2)

#vendors dimension table
vendors = spark.sql("""
    SELECT DISTINCT
            vendor_number,
            vendor_name
    FROM df_spark
""")

print("vendors")
vendors.show(2)

#counties dimension table
counties = spark.sql("""
    SELECT DISTINCT
            county_number,
            county
    FROM df_spark
""")

print("counties")
counties.show(2)

#stores dimension table
stores = spark.sql("""
    SELECT DISTINCT
            store_number,
            store_name,
            address,
            city,
            zipcode,
            latitude,
            longitude
    FROM df_spark
""")

print("stores")
stores.show(2)

#time dimension table
time = spark.sql("""
    SELECT DISTINCT
            sales.sales_date,
            day(sales.sales_date) as day,
            weekofyear(sales.sales_date) as week,
            month(sales.sales_date) as month,
            year(sales.sales_date) as year,
            dayofweek(sales.sales_date) as weekday,
            case
                when holidays.holiday_name is null then False
                else True
            end as is_holiday,
            holidays.holiday_name
    FROM df_spark sales
    LEFT JOIN df_holidays_spark holidays
    ON sales.sales_date = holidays.date
""")

print("time")
time.where("is_holiday = True").show(5)

#liquor_sales fact table
liquor_sales = spark.sql("""
    SELECT
            sales.invoice_number,
            sales.sales_date,
            sales.store_number,
            sales.county_number,
            sales.item_number,
            sales.vendor_number,
            sales.bottles_sold,
            sales.volume_sold_litres,
            sales.item_cost_price,
            sales.item_retail_price,
            sales.sales_usd,
            weather.climate_temp
    FROM df_spark sales
    LEFT JOIN df_weather_spark weather
    ON year(sales.sales_date) = weather.year
    AND month(sales.sales_date) = weather.month
    AND sales.county = weather.county
""")

print("liquor_sales")
liquor_sales.show(2)

items
+-----------+-------------------+------------------+-------------+----+
|item_number|        description|     category_name|bottle_volume|pack|
+-----------+-------------------+------------------+-------------+----+
|    88602.0|Solana Agave Blanco|100% Agave Tequila|          750|   6|
|    43036.0|       Bacardi Gold|          Gold Rum|          750|  12|
+-----------+-------------------+------------------+-------------+----+
only showing top 2 rows

vendors
+-------------+--------------------+
|vendor_number|         vendor_name|
+-------------+--------------------+
|        971.0|Hood River Distil...|
|        395.0|             PROXIMO|
+-------------+--------------------+
only showing top 2 rows

counties
+-------------+-------+
|county_number| county|
+-------------+-------+
|         71.0| Obrien|
|         13.0|Calhoun|
+-------------+-------+
only showing top 2 rows

stores
+------------+--------------------+--------------------+------+-------+---------+---------+
|stor

#### 5. Data Quality Checks

##### a. Count Check

In [98]:
list_of_errors = []

if items.count() < 1:
    list_of_errors.append("items table count has no records")
if vendors.count() < 1:
    list_of_errors.append("vendors table count has no records")
if counties.count() < 1:
    list_of_errors.append("counties table count has no records")
if stores.count() < 1:
    list_of_errors.append("stores table count has no records")
if time.count() < 1:
    list_of_errors.append("time table count has no records")
if liquor_sales.count() < 1:
    list_of_errors.append("liquor_sales table count has no records")
    
if len(list_of_errors) >= 1:
    for error in list_of_errors:
        print(error)
else:
    print("No issues with count checks")

No issues with count checks


##### b. Null checks

In [99]:
list_of_errors = []

if liquor_sales.where("invoice_number is null").count() > 0:
    list_of_errors.append("null records detected in invoice_number field of liquor_sales table")

if len(list_of_errors) >= 1:
    for error in list_of_errors:
        print(error)
else:
    print("No issues with null checks")

No issues with null checks


----------

### Sample Queries and Results

**Which liquor items are most popular (by number of bottles sold)**

In [100]:
items.createOrReplaceTempView("items")
liquor_sales.createOrReplaceTempView("liquor_sales")

spark.sql("""
    SELECT 
        items.description,
        sum(liquor_sales.bottles_sold) as total_bottles_sold
    FROM liquor_sales LEFT JOIN items
    ON liquor_sales.item_number = items.item_number
    GROUP BY items.description
    ORDER BY total_bottles_sold desc
""").show(5, truncate=False)

+------------------------------+------------------+
|description                   |total_bottles_sold|
+------------------------------+------------------+
|Fireball Cinnamon Whiskey     |339978            |
|Black Velvet                  |167793            |
|Titos Handmade Vodka          |128591            |
|Hawkeye Vodka                 |100932            |
|Captain Morgan Original Spiced|89291             |
+------------------------------+------------------+
only showing top 5 rows



**Which county has stores which spend the most on liquor purchases**

In [101]:
counties.createOrReplaceTempView("counties")
liquor_sales.createOrReplaceTempView("liquor_sales")

spark.sql("""
    SELECT
        counties.county,
        sum(liquor_sales.sales_usd) as total_spend
    FROM liquor_sales LEFT JOIN counties
    ON liquor_sales.county_number = counties.county_number
    GROUP BY county
    ORDER BY total_spend desc
""").show(5, truncate=False)

+----------+-----------+
|county    |total_spend|
+----------+-----------+
|Polk      |10866959.85|
|Linn      |4461406.27 |
|Scott     |3497930.90 |
|Johnson   |3036210.04 |
|Black Hawk|2791777.42 |
+----------+-----------+
only showing top 5 rows



In [23]:
#stop spark session
spark.stop()