# 2019 Liquor Sales in Iowa
### Data Engineering Capstone Project

#### Project Summary
We are going to look at spirits purchase information of Iowa Class “E” liquor licensees in 2019 that is pulically available.  We're going to create several dimension tables, as well as a sales summary table that we can feed into a visualization tool like Tableau for executives to look at sales trends.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
!pip install s3fs

Collecting s3fs
  Downloading https://files.pythonhosted.org/packages/72/5c/ec84c7ec49fde2c3b0d885ecae4504fa40fc77fef7684e9f2939c50f9b94/s3fs-0.4.0-py3-none-any.whl
Collecting fsspec>=0.6.0 (from s3fs)
[?25l  Downloading https://files.pythonhosted.org/packages/dd/1f/7028dacd3c28f34ce48130aae73a88fa5cc27b6b0e494fcf2739f7954d9d/fsspec-0.6.2-py3-none-any.whl (62kB)
[K    100% |████████████████████████████████| 71kB 6.1MB/s ta 0:00:011
[?25hCollecting boto3>=1.9.91 (from s3fs)
[?25l  Downloading https://files.pythonhosted.org/packages/d5/57/e9675a5a8d0ee586594ff19cb9a601334fbf24fa2fb29052d2a900ee5d23/boto3-1.11.9-py2.py3-none-any.whl (128kB)
[K    100% |████████████████████████████████| 133kB 8.8MB/s eta 0:00:01
[?25hCollecting botocore>=1.12.91 (from s3fs)
[?25l  Downloading https://files.pythonhosted.org/packages/64/4c/b0b0d3b6f84a05f9135051b56d3eb8708012a289c4b82ee21c8c766f47b5/botocore-1.14.9-py2.py3-none-any.whl (5.9MB)
[K    100% |████████████████████████████████| 5.9MB 4.1MB

In [57]:
import pandas as pd
import boto3
import json
import configparser

from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

import s3fs
import sqlalchemy

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [5]:
os.environ['AWS_ACCESS_KEY_ID']='XXX'
os.environ['AWS_SECRET_ACCESS_KEY']='XXX'

In [6]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [40]:
### 2019 liquor sales data.  This may take a few seconds.
### https://data.iowa.gov/Sales-Distribution/Iowa-Liquor-Sales/m3tr-qhgy


### NOTE! Since I couldn't submit with the raw csv, I zip

In [7]:
df = spark.read.csv('/home/workspace/data/Iowa_Liquor_Sales.csv', header=True)

In [8]:
df.createOrReplaceTempView("sales")

In [28]:
spark.sql("""
    SELECT *
    FROM sales
    limit 5
""").show()

+-------------------+----------+------------+--------------------+--------------------+------------+--------+--------------------+-------------+-------+--------+-----------------+-------------+--------------------+-----------+--------------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|      Date|Store Number|          Store Name|             Address|        City|Zip Code|      Store Location|County Number| County|Category|    Category Name|Vendor Number|         Vendor Name|Item Number|    Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----------+------------+--------------------+--------------------+------------+--------+--------------------+-------------+-------+--------+-----------------+-------------+--------------------+-----------+------------

In [10]:
print((df.count(), len(df.columns)))

(2380346, 24)


In [60]:
### Median income and population data by zip code.  From here:
### https://www.psc.isr.umich.edu/dis/census/Features/tract2zip/

zip_df = spark.read.csv('/home/workspace/data/zip_median.csv', header=True)
zip_df = (
         zip_df
         .withColumn('Pop', regexp_replace('Pop', ',', ''))
         .withColumn('Median', regexp_replace('Median', ',', ''))
     )
zip_df.createOrReplaceTempView("zip_median")

In [61]:
spark.sql("""
    SELECT *
    FROM zip_median
    limit 5
""").show()

+----+------+------+-----+
| Zip|Median|  Mean|  Pop|
+----+------+------+-----+
|1001| 56663|66,688|16445|
|1002| 49853|75,063|28069|
|1003| 28462|35,121| 8491|
|1005| 75423|82,442| 4798|
|1007| 79076|85,802|12962|
+----+------+------+-----+



In [63]:
zip_df.describe().show()

+-------+-----------------+-----------------+------------------+------------------+
|summary|              Zip|           Median|              Mean|               Pop|
+-------+-----------------+-----------------+------------------+------------------+
|  count|            32634|            32634|             32634|             32634|
|   mean|49875.28075013789|50938.20524606239|            536.55| 9192.768186553902|
| stddev|27382.47649773278|20356.27230539215|254.20080645033366|13416.240474315491|
|    min|            10001|           100000|                 .|                 1|
|    max|            99929|            99994|            99,986|              9997|
+-------+-----------------+-----------------+------------------+------------------+



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
We're going to look at both the sales dataframe and the zip code data frame to see if there are any null values that we need to consider.

In [32]:
### Check the zip_median table to see if there are duplicate zip codes
### to make sure we don't have problems during the join. 

spark.sql("""
    SELECT count(*),
    count(distinct Zip)
    FROM zip_median
""").show()

+--------+-------------------+
|count(1)|count(DISTINCT Zip)|
+--------+-------------------+
|   32634|              32634|
+--------+-------------------+



The zip code dataframe doesn't seem to have any columns that are null and all zips are unique, which is great!

In [40]:
### check to see if Zip Code value is null or blank
spark.sql("""
    SELECT count(*)
    FROM sales
    where `Zip Code`='' or `Zip Code` is null
""").show()

+--------+
|count(1)|
+--------+
|    4764|
+--------+



Seems like all of sales table has a Zip Code value which is great!

In [41]:
### check to see if Zip Code value is null or blank
spark.sql("""
    SELECT `Store Name`,
    count(*)
    FROM sales
    where Address='' or Address is null
    group by 1
    order by `Store Name`
""").show()

+--------------------+--------+
|          Store Name|count(1)|
+--------------------+--------+
|       A to Z Liquor|      89|
|   Adventureland Inn|      14|
|B and B EAST / Wa...|      45|
|    Burlington Shell|      39|
|CVS Pharmacy #101...|      13|
|Casey's General S...|      70|
|Casey's General S...|      31|
|Casey's General S...|      69|
|Casey's General S...|      36|
|Casey's General S...|      23|
|Chuck's Sportsman...|      96|
|Clear Lake Payles...|      63|
|Fareway Stores #0...|      54|
|Fareway Stores #4...|     110|
|Fareway Stores #7...|     178|
|     Flashmart  #105|      11|
|Flashmart #103/An...|      23|
|       Freeman Foods|      73|
|Hometown Foods / ...|     147|
|    Hy-Vee / Corydon|      29|
+--------------------+--------+
only showing top 20 rows



Some store names have null for addess, zip, etc, but that's fine.  As long as we know that some won't join to the zip code demographics table, we're okay.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
1) products table - this is a fact table of all the liquiors sold in 2019.

2) stores table - this is a fact table of all the stores that sold liquor in 2019.  This table will have population and medina income information based on the zipcode of the store for market research purposes.

3) sales_summary - this is the monthly retail sales for 2019.  This is created so the data visualization team can feed this into Tableau

#### 3.2 Mapping Out Data Pipelines
1) First, I'm going to make the two dimension tables and the sales summary table as a Spark dataframe using SparkSQL

2) Then I'm going to convert that into a pandas dataframe

3) I will create three empty tables in Redshift

4) Finally, I'm going to copy the dataframes from 2) into the empty tables in 3)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

# Create Spark dataframes using SparkSQL

In [87]:
products=spark.sql("""
    SELECT
    Category,
    `Category Name`,
    `Vendor Number`,
    `Vendor Name`,
    `Item Number`,
    `Item Description`,
    `Bottle Volume (ml)`
    FROM sales
    where Category is not null
    group by 
    Category,
    `Category Name`,
    `Vendor Number`,
    `Vendor Name`,
    `Item Number`,
    `Item Description`,
    `Bottle Volume (ml)`
""")

In [64]:
stores=spark.sql("""
    SELECT
    `Store Number`,
    `Store Name`,
    Address,
    City,
    a.`Zip Code`,
    Median,
    Pop
    FROM sales a 
    left join zip_median b
    on a.`Zip Code` =b.Zip
    --where Address is not null 
    group by
    `Store Number`,
    `Store Name`,
    Address,
    City,
    a.`Zip Code`,
    Median,
    Pop
""")

In [38]:
sales_summary=spark.sql("""
    SELECT
    month(TO_DATE(CAST(UNIX_TIMESTAMP(Date, 'MM/dd/yyyy') AS TIMESTAMP))) as mnth,
    sum(`Sale (Dollars)`)
    FROM sales
    group by 
    month(TO_DATE(CAST(UNIX_TIMESTAMP(Date, 'MM/dd/yyyy') AS TIMESTAMP)))
""")

# Then convert them into Pandas dataframes

In [88]:
%%time
pandas_products = products.toPandas()

CPU times: user 33.6 ms, sys: 4.84 ms, total: 38.4 ms
Wall time: 18 s


In [53]:
# exported to csv to check data
#pandas_products.to_csv('sample.csv', index=False, header=True)  

In [65]:
%%time
pandas_stores = stores.toPandas()

CPU times: user 22 ms, sys: 1.52 ms, total: 23.5 ms
Wall time: 17.8 s


In [39]:
pandas_sales_summary = sales_summary.toPandas()

# Next several cells are to create a Redshift cluster using boto3

In [15]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [18]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [19]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::945626161665:role/dwhRole


In [20]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [21]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-c5a636bd
7,NumberOfNodes,4


In [23]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::945626161665:role/dwhRole


In [13]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-080cb2e80b66538df')


In [13]:
%load_ext sql

In [24]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

# Create empty tables

In [89]:
%%sql
drop table products;
CREATE TABLE IF NOT EXISTS products (
Category int,
Category_Name varchar,
Vendor_Number int,
Vendor_Name	varchar,
Item_Number int,
Item_Description varchar,
Bottle_Volume_ml int
);

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [44]:
%%sql
--drop table stores;
CREATE TABLE IF NOT EXISTS stores (
Store_Number int,
Store_Name varchar,
Address varchar,
City varchar,
Zip_Code int,
Median_income int,
Population int
);

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [40]:
%%sql
--drop table sales_summary;
CREATE TABLE IF NOT EXISTS sales_summary (
month int,
sales float
);

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

# Insert data into empty tables

In [90]:
%%time

con = sqlalchemy.create_engine(conn_string)

s3 = s3fs.S3FileSystem(anon=False)
filename = 'dend-buket-fujimoto2/capstone/pandas_products.csv'
with s3.open(filename, 'w') as f:
    pandas_products.to_csv(f, index=False, header=False)


con.execute("""
    DELETE products;
    COPY products
    from 's3://%s'
    iam_role 'arn:aws:iam::945626161665:role/dwhRole'
    csv;""" % filename)

CPU times: user 41.9 ms, sys: 13.1 ms, total: 55 ms
Wall time: 6.16 s


In [66]:
%%time

filename = 'dend-buket-fujimoto2/capstone/pandas_stores.csv'
with s3.open(filename, 'w') as f:
    pandas_stores.to_csv(f, index=False, header=False)


con.execute("""
    DELETE stores;
    COPY stores
    from 's3://%s'
    iam_role 'arn:aws:iam::945626161665:role/dwhRole'
    csv;""" % filename)

CPU times: user 33.5 ms, sys: 2.92 ms, total: 36.4 ms
Wall time: 10.3 s


In [41]:
%%time

filename = 'dend-buket-fujimoto2/capstone/pandas_sales.csv'
with s3.open(filename, 'w') as f:
    pandas_sales_summary.to_csv(f, index=False, header=False)


con.execute("""
    DELETE sales_summary;
    COPY sales_summary
    from 's3://%s'
    iam_role 'arn:aws:iam::945626161665:role/dwhRole'
    csv;""" % filename)

CPU times: user 22.9 ms, sys: 4.48 ms, total: 27.4 ms
Wall time: 11.5 s


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Checking products table; below is a query to check rows per unique item_number

In [100]:
%%sql
select it_ct, count(*) from (
select 
item_number,
count(*) as it_ct
from products
group by 1
) a group by 1
order by 1

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
3 rows affected.


it_ct,count
1,3331
2,573
3,37


In [None]:
%%sql
select 
item_number,
count(*) as it_ct
from products
group by 1
having count(*)=2 and count(distinct bottle_volume_ml)=1

In [99]:
%%sql
select 
*
from products 
where item_number=59159
limit 10
;

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
2 rows affected.


category,category_name,vendor_number,vendo_name,item_number,item_description,bottle_volume_ml
1071100,Cocktails /RTD,395,PROXIMO,59159,1800 Ultimate Raspberry Margarita,1750
1071100,Cocktails /RTD,395,PROXIMO,59159,1800 Ultimate Raspberry,1750


There seems be some item_numbers that have more than one row.  Some have different item_descrition and others have different bottle_volume_ml.  Since I am not an SME, I will keep all the records for now.

In [101]:
# Checking stores table; below is a query to check rows per unique item_number

In [103]:
%%sql
select 
count(*),
count(distinct store_number)
from stores 
;

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count,count_1
1826,1748


In [104]:
%%sql
select it_ct, count(*) from (
select 
store_number,
count(*) as it_ct
from stores
group by 1
) a group by 1
order by 1

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
4 rows affected.


it_ct,count
1,1686
2,47
3,14
4,1


In [111]:
%%sql
select 
store_number,
count(*) as it_ct
from stores
group by 1
having count(*)=2

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
47 rows affected.


store_number,it_ct
5711,2
5774,2
5709,2
4779,2
4859,2
4947,2
5106,2
5245,2
4533,2
2585,2


In [112]:
%%sql
select 
*
from stores 
where store_number=5774
limit 10
;

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
2 rows affected.


store_number,store_name,address,city,zip_code,median_income,population
5774,Casey's General Store #2185 / Manchester,908 W Main St,Manchester,52057,46013,8222
5774,Casey's General Store #2185 / Manchester,1305 W Commercial St,Manchester,52057,46013,8222


Again, there seems be some store_numbers that have more than one row.  Some have different addresses and others have different store_name.  Since I am not an SME, I will keep all the records for now.

In [121]:
# Check the summary table

In [114]:
%%sql
select * From sales_summary order by 1

 * postgresql://dwhuser:***@dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com:5439/dwh
12 rows affected.


month,sales
1,23466782.5899997
2,24641910.4899993
3,24982728.3599986
4,27788607.85
5,32756773.2699958
6,28877030.9599999
7,30967399.6899979
8,28424263.2899975
9,28078111.230002
10,34448213.3499975


Great!  All 12 months are generated and the sales numbers look good.

An insight would be the sales volume is high in the summer, as well as Q4 when there are a lot of holidays and events where people tend to drink a lot.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Products table**

category - category code

category_name - category name of product

vendor_number - vendor id

vendor_name - name of company that produces the product

item_number - item id

item_description - 	name of product

bottle_volume_ml - volume in ml

**Stores table**

store_number - unique store id

store_name - name of store

address - address

city - city

zip_code - zip code of store

median_income - median income of zip code

population - population of zip code

**Sales_Summary table**

month - month identifier of 2019

sales - sales by month

This table is to be fed into a BI tool like Tableau so executives and decicison makers can get high level metrics

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Tools**

Spark and Redshift were selected as a tool since the sales records csv was easily over 2M records.  Spark made it very fast to read the records and crunch the numbers.

**Update Frequency**

The sales data is updated by the State of Iowa every month.  Data should be updated after the source is updated.

**Scenarios**

If the data increased by 100x, I would probably use Spark in AWS EMR and tunnel it to a linux box.

If the data needed to be updated every day at 7AM (assuming the source would be updated everyday), I would use Airflow to schedule the summary and the feed to Redshift.

If the data needed to be accessed by 100+ people, I would still put it in Redhshift since more people are familar with SQL.

# Delete Clusters and connections

In [118]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh',
  'Endpoint': {'Address': 'dwhcluster.chuxjgcjz9kt.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 1, 31, 19, 2, 51, 632000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-2dfbb66b',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-c5a636bd',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'sun:08:00-sun:08:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessibl

In [122]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,{'Port': 5439}
6,VpcId,vpc-c5a636bd
7,NumberOfNodes,4


In [120]:
spark.stop()