
- Step 3: Define the Data Model
Map out the conceptual data model and explain why you chose that model
List the steps necessary to pipeline the data into the chosen data model
- Step 4: Run ETL to Model the Data
Create the data pipelines and the data model
Include a data dictionary
Run data quality checks to ensure the pipeline ran as expected
Integrity constraints on the relational database (e.g., unique key, data type, etc.)
Unit tests for the scripts to ensure they are doing the right thing
Source/count checks to ensure completeness
- Step 5: Complete Project Write Up
What's the goal? What queries will you want to run? How would Spark or Airflow be incorporated? Why did you choose the model you chose?
Clearly state the rationale for the choice of tools and technologies for the project.
Document the steps of the process.
Propose how often the data should be updated and why.
Post your write-up and final data model in a GitHub repo.
Include a description of how you would approach the problem differently under the following scenarios:
If the data was increased by 100x.
If the pipelines were run on a daily basis by 7am.
If the database needed to be accessed by 100+ people.

# The aim of this project

The aim of this project is to reverse engineer a private API and use the data gained from this exercise to create a DWH on Redshift. I will bring the API data to a normalized form, and populate the purchases table with dummy data generated from faker, based on the already existing tables.


# Reverse-engineering the API

The only place I was able to find a comprehensive dataset on gins is stuck behind an iOS/Android app's private API. The following is a high level description of how I accessed the base URL, API structure and API key.
- The simplest way to approach the problem is to use an Android emulator, in this case I used Android Studio. Another option would be to use mitmproxy, but due to Android's strict Certificate Authority management it is a bit finicky to setup with a system certificate on an Android emulator.
- I downloaded the target app APK, and installed it on the emulted device.
- Installed ADB, and made sure to add the platform-tools folder to your PATH variable.
- Installed HTTP Toolkit. Selected Android device via ADB as my traffic source and followed setup steps in the emulator.
- Done! Now I can see HTTP requests coming in from the emulator.
- All there is left to do is find the GET request that I am after, I have done this by clicking on a new item on the main page of the application. The resulting API key and structure is what I use in my code.


In [1]:
import pandas as pd
import numpy as np
import requests
import time
import json
import os
import boto3
from requests.exceptions import HTTPError
import sql_statements
import configparser

# Variables
config = configparser.ConfigParser()
config.read('config.cfg')

baseURL = config.get("REQUESTS", "baseURL")
api_key = config.get("REQUESTS", "api_key")
headers = {
    'User-Agent': config.get("REQUESTS", "headers_user_agent"),
    'From': config.get("REQUESTS", "headers_from")
}

raw_folder = config.get("FOLDER", "raw_folder")
normalized_folder = config.get("FOLDER", "normalized_folder")

#Identifiers & Credentials
DB_NAME=config.get("CLUSTER", "DB_NAME")
CLUSTER_ID=config.get("CLUSTER", "CLUSTER_ID")
DB_USER=config.get("CLUSTER", "DB_USER")
DB_PASSWORD=config.get("CLUSTER", "DB_PASSWORD")
CLUSTER_TYPE=config.get("CLUSTER", "CLUSTER_TYPE")
NODE_TYPE=config.get("CLUSTER", "NODE_TYPE")
NUMBER_OF_NODES=config.get("CLUSTER", "NUMBER_OF_NODES")
DB_PORT=config.get("CLUSTER", "DB_PORT")

ROLE_NAME = config.get("IAM_ROLE", "ROLE_NAME")
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")
REGION = config.get("AWS", "REGION")

s3_client = boto3.client("s3",
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
redshift = boto3.client('redshift',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
iam = boto3.client('iam',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
ec2 = boto3.resource('ec2',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )
redshift_data = boto3.client('redshift-data',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

# Pull the data from API

In [None]:
# Get highest ID number from ginventory_short.json
# A version of the dataset is available from one of the first calls the app makes when establishing connection,
# with a reduced set of columns. The largest id num (8770) comes from this file (ginventory_short.json).
with open('/'.join([raw_folder, 'ginventory_short.json']), 'r', encoding='utf-8') as file:
    file_json = json.load(file)

df = pd.DataFrame(file_json)
df['id'] = df['id'].apply(pd.to_numeric)
largest_id = df.sort_values('id', ascending=False).head(1)
largest_id = largest_id.iloc[0,0]
print(largest_id)

In [None]:
# Looping through all the requests, save responses to local and S3
response_collection = []
for i in range(1,largest_id+1):
    url = baseURL.format(i, api_key)
    try:
        response = requests.get(url, headers = headers)
        response.raise_for_status()
        print(i)
        response_collection.append(response.json())
        time.sleep(0.1)
    except HTTPError as http_err:
        print(f'HTTP error occurred: {http_err}')
    except Exception as err:
        print(f'Other error occurred: {err}')
    else:
        print('Success!')
    
with open('/'.join([raw_folder, 'data_full.json']), 'w', encoding='utf-8') as f:
    json.dump(response_collection, f, ensure_ascii=False, indent=4)
print("Responses collected!")

# Upload raw file to s3
s3_client.upload_file('/'.join([raw_folder, 'data_full.json']), "ginventory-bucket", '/'.join([raw_folder, 'data_full.json']))

In [2]:
# Read from local file
with open('/'.join([raw_folder, 'data_full.json']), 'r', encoding='utf-8') as file:
    file_json = json.load(file)

# Data quality issues

- Country and abv had cases where 2 values were included, seperated by /.
- Columns that should be integer have been cast as such.
- Country names did not always match ISO standards, in such cases the country name was replaced.

## Data cleaning

In [3]:
df = pd.json_normalize(file_json)
df = df.drop(['direct_purchase_url', 'user_rating', 'in_wishlist', 'in_cabinet', 'purchase_links.data',
              'perfect_tonics.data','perfect_garnishes.data', 'perfect_gins.data', 'description.content', 
              'description.google_translation', 'description.original_content'], axis=1)


# Correct abv values where format is ##/##, first number is taken
#df['abv'] = df[df["abv"].str.contains("/", na=False)].abv.str.split('/').str.get(0).str.strip()
df['abv'] = np.where(df["abv"].str.contains("/", na=False), df['abv'].str.split('/').str.get(0).str.strip(), df['abv'])

# Correct values with % in them, change commas to dots
df['abv'] = df['abv'].replace('%','', regex=True).replace(',','.', regex=True)

# Handling numeric columns to adhere to correct data types in downstream pipeline
df['abv'] = pd.to_numeric(arg=df['abv'] ,errors='coerce')
df['average_rating'] = df['average_rating'].apply(pd.to_numeric)

# Fill NA with 0, so we can downcast all values to integer
df['rating_count'] = df['rating_count'].fillna(0)
df['rating_count'] = pd.to_numeric(arg=df['rating_count'],downcast='integer')

# Correcting country column.
# Handling the case where country format = country / country (eg.: Switzerland / United States)
#df['country'] = df[df['country'].str.contains('/', na=False)].country.str.split('/').str.get(0).str.strip()
df['country'] = np.where(df['country'].str.contains('/', na=False), df['country'].str.split('/').str.get(0).str.strip(), df['country'])

# Country names that do not adhere to ISO country naming standards are replaced
country_corrections = {
    'Vietnam': 'Viet Nam',
    'Russia' : 'Russian Federation',
    'Taiwan' : 'Taiwan, Province of China',
    'U.S. Virgin Islands' : 'Virgin Islands, U.s.',
    'Hong Kong SAR China' : 'Hong Kong',
    'Unknown or Invalid Region' : None,
    '' : None,
    }

for i in country_corrections:
    df['country'] = np.where(df.country == i, country_corrections[i], df['country'])

df['country'] = np.where(df.producer == 'Little Brown Dog Spirits', 'United Kingdom', df['country'])

df_garnish = df[df.type == 'garnish']
df_garnish = df_garnish[['id', 'type', 'name']]

# We drop the remaining rows without valid country values.
df = df[df.country.isna()==False]

df_gin = df[df.type == 'gin']
df_tonic = df[df.type == 'tonic']

## Save normalized tables to local and S3

In [4]:
os.makedirs(normalized_folder, exist_ok=True)
df_gin.to_csv('/'.join([normalized_folder, 'gins.csv']), sep=';', index=False)
df_garnish.to_csv('/'.join([normalized_folder, 'garnishes.csv']), sep=';', index=False)
df_tonic.to_csv('/'.join([normalized_folder, 'tonics.csv']), sep=';', index=False)

s3_client.upload_file('/'.join([normalized_folder, 'gins.csv']), "ginventory-bucket", '/'.join([normalized_folder, 'gins.csv']))
s3_client.upload_file('/'.join([normalized_folder, 'garnishes.csv']), "ginventory-bucket", '/'.join([normalized_folder, 'garnishes.csv']))
s3_client.upload_file('/'.join([normalized_folder, 'tonics.csv']), "ginventory-bucket", '/'.join([normalized_folder, 'tonics.csv']))


## Extract relationships between gins, garnishes and tonics

In [None]:
# These tables will form our many-to-many translation tables in our model

df = pd.DataFrame(file_json)
df = df[df.type == 'gin']

# Prepare dataframe with the gin->perfect tonic relationship
df["perfect_tonics"] = df["perfect_tonics"].str["data"]
df_perfect_tonics = df.explode("perfect_tonics")
df_perfect_tonics = pd.concat(
    [
        df_perfect_tonics,
        df_perfect_tonics.pop("perfect_tonics").apply(pd.Series).add_prefix("perfect_tonics_"),
    ], axis=1)

# Select the necessary columns, drop NA, change data types
df_perfect_tonics = df_perfect_tonics[['id', 'perfect_tonics_id']]
df_perfect_tonics.dropna(inplace = True)
df_perfect_tonics = df_perfect_tonics.astype(int)

# Save
df_perfect_tonics.to_csv('/'.join([normalized_folder, 'perfect_tonics.csv']), sep=';', index=False)
s3_client.upload_file('/'.join([normalized_folder, 'perfect_tonics.csv']), "ginventory-bucket", '/'.join([normalized_folder, 'perfect_tonics.csv']))

# Prepare dataframe with the gin->perfect garnish relationship
df["perfect_garnishes"] = df["perfect_garnishes"].str["data"]
df_perfect_garnishes = df.explode("perfect_garnishes")
df_perfect_garnishes = pd.concat(
    [
        df_perfect_garnishes,
        df_perfect_garnishes.pop("perfect_garnishes")
        .apply(pd.Series)
        .add_prefix("perfect_garnishes_"),
    ], axis=1)

# Select the necessary columns, drop NA, change data types
df_perfect_garnishes = df_perfect_garnishes[['id', 'perfect_garnishes_id']]
df_perfect_garnishes.dropna(inplace = True)
df_perfect_garnishes = df_perfect_garnishes.astype(int)

# Save
df_perfect_garnishes.to_csv('/'.join([normalized_folder, 'perfect_garnishes.csv']), sep=';', index=False)
s3_client.upload_file('/'.join([normalized_folder, 'perfect_garnishes.csv']), "ginventory-bucket", '/'.join([normalized_folder, 'perfect_garnishes.csv']))

# Exploring our main dataset


##  General counts

In [None]:
print(f'Total number of gins in our dataset: {df_gin.id.count()}')
print(f'Total number of garnishes in our dataset: {df_garnish.id.count()}')
print(f'Total number of tonics in our dataset: {df_tonic.id.count()}')

print(f'{df_perfect_garnishes.nunique().id} Gins have at least one garnish indicated as a perfect match, {df_perfect_garnishes.perfect_garnishes_id.count()} perfect matches in total.')
print(f'{df_perfect_tonics.nunique().id} Gins have at least one tonic indicated as a perfect match, {df_perfect_tonics.perfect_tonics_id.count()} perfect matches in total.')


In [None]:
# Best rated gins
df_gin_explore = df_gin[["name", "average_rating"]]
df_gin_explore.sort_values("average_rating", ascending=False).head(3)

In [None]:
# Gin avergae rating histogram
df_gin.hist(column='average_rating' ,bins=10)

In [None]:
# Most rated gins 
df_gin_explore = df_gin[["name", "rating_count"]]
df_gin_explore.sort_values("rating_count", ascending=False).head(3)

In [None]:
# Gin ratings count histogram
# Most gins have 0-20 ratings
df_gin.hist(column='rating_count' ,bins=100, range=[0, 1000])

In [None]:
# Best rated producers
df_gin_explore = df_gin[["producer", "average_rating"]]
df_gin_explore.groupby('producer').mean().sort_values("average_rating",ascending=False).head(3)

In [None]:
# Producer average ratings histogram
df_gin.groupby('producer').mean().hist(column='average_rating' ,bins=10)

In [None]:
# Countries with the most gins
df_gin.groupby('country').nunique().id.sort_values(ascending=False).head(5)

In [None]:
# Countries with the most gin producers
df_gin.groupby(['country']).nunique().producer.sort_values(ascending=False).head(5)

# AWS

### IAM

In [5]:
# Create IAM role so we can access S3 from Redshift
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)

# Attach S3 access policy to our new role

iam.attach_role_policy(RoleName=ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=ROLE_NAME)['Role']['Arn']

print(roleArn)



1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name DWH_ROLE already exists.
1.3 Get the IAM role ARN
arn:aws:iam::752709659342:role/DWH_ROLE


### Redshift

In [8]:
# Create redshift cluster
try:
    response = redshift.create_cluster(
        ClusterType= CLUSTER_TYPE,
        NodeType=NODE_TYPE,
        NumberOfNodes=int(NUMBER_OF_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=CLUSTER_ID,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)



An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


In [9]:
# Prettify redshift cluster properties
def prettyRedshiftProps(props):
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])


myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_ID)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,ginventorydwh
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,ginventory_auto
5,Endpoint,{'Address': 'ginventorydwh.czo11jameqnn.eu-cen...
6,VpcId,vpc-01c2590b227c9dbaf
7,NumberOfNodes,1


In [10]:
# Set a VPC security group rule to authorize ingress to the cluster's VPC Security Group
# Extract cluster info

vpc_security_group_id = myClusterProps["VpcSecurityGroups"][0]["VpcSecurityGroupId"]
print(vpc_security_group_id)
try:
    # Extract security group for the VPC
    vpc_sg = ec2.SecurityGroup(id = vpc_security_group_id)
    
    # Authorize connection to the VPC
    vpc_sg.authorize_ingress(
        GroupName = vpc_sg.group_name,
        CidrIp = "0.0.0.0/0",
        IpProtocol = "TCP",
        FromPort = 5439,
        ToPort = 5439
    )
    print("Ingress to the VPC authorized")
    
except Exception as e:
    
    # Check if the error is a duplication error
    if "InvalidPermission.Duplicate" in str(e):
        print("Rule requested already exists")
    else:
        print(e)

sg-0d5e137166258a0e9
Rule requested already exists


# Load data to Redshift

In [16]:
# Drop and create all tables

response = redshift_data.batch_execute_statement(
        ClusterIdentifier="ginventoryDWH",
        Database="ginventory_auto",
        DbUser="dwhuser",
        Sqls=["SELECT * FROM gins"]
        )


ValidationException: An error occurred (ValidationException) when calling the BatchExecuteStatement operation: Redshift endpoint doesn't exist in this region.

In [None]:
# Load tables from S3

file_names = ['garnishes', 'gins', 'perfect_garnishes', 'perfect_tonics', 'tonics']

sqls = [f"""
        COPY {name}
        FROM 's3://ginventory-bucket/normalized_data/{name}.csv'
        IAM_ROLE '{roleArn}' 
        FORMAT AS csv
        IGNOREHEADER 1
        delimiter ';'
    """ for name in file_names]

response = redshift_data.batch_execute_statement(
        ClusterIdentifier=CLUSTER_ID,
        Database=DB_NAME,
        DbUser=DB_USER,
        Sqls=sqls
    )

# Load countries data from SQL inserts
response = redshift_data.execute_statement(
        ClusterIdentifier=CLUSTER_ID,
        Database=DB_NAME,
        DbUser=DB_USER,
        Sql=sql_statements.INSERT_INTO_COUNTRIES
    )

In [None]:
# Function to return pandas df from redshift response Id

def redshift_get_statement_result_to_dataframe(response_id):
    '''
    Returns pandas dataframe of the statement Id passed from the response object 
    of the redshift-data.execute_statement method. If statement has no result set
    a message is returned.
    response = redshift_data.execute_statement(...)
    response_id = response['Id]
    '''
    # Timeout after 30 seconds of no FAILED or FINISHED status response
    timeout = time.time() + 30

    while time.time() < timeout:
        describe_obj = redshift_data.describe_statement(Id=response_id)
        status = describe_obj['Status']
        if status == 'FINISHED':
            if describe_obj['HasResultSet'] == True:
                statement_result = redshift_data.get_statement_result(Id=response_id)
                try:
                    if len(statement_result['Records']) == 0:
                        return 'Result set contains 0 rows.'
                    df = pd.DataFrame(statement_result['Records'])
                    df.rename(columns=pd.DataFrame(statement_result['ColumnMetadata']).name, inplace=True)
                    df = df.apply(pd.Series)

                    for column in df.columns:
                        df[column] = df[column].apply(pd.Series)
                    return df
                except Exception as e:
                    raise ValueError('Query status: ' + status + '\n' + describe_obj['Error'])
            else:
                return 'Statement has no result set. Call with sql statement that produces a result set.'
        elif status == 'FAILED':
            raise ValueError('Query status: ' + status + '\n' + describe_obj['Error'])
        else:
            time.sleep(1)
    else:
        raise TimeoutError('Timeout limit exceeded.')



# Data checks, further data modeling

In [None]:
# Check if all country names adhere to the country table.
# We expect to see 0 rows returned

sql_statement = '''
    SELECT * FROM
    (
        SELECT * FROM gins 
        LEFT JOIN countries 
        ON gins.country = countries.nicename 
        WHERE countries.id is Null
    ) 
        AS country_mismatches;
'''

response = redshift_data.execute_statement(
        ClusterIdentifier=CLUSTER_ID,
        Database=DB_NAME,
        DbUser=DB_USER,
        Sql=sql_statement
    )


redshift_get_statement_result_to_dataframe(response['Id'])

In [None]:
# Add country_id column to tonics and gins tables, delete country column

sqls = []

sqls.append('''
    ALTER TABLE tonics
    ADD country_id smallint;
''')
sqls.append('''
    UPDATE tonics SET country_id = countries.id FROM countries JOIN tonics t on t.country=countries.nicename
''')
sqls.append('''
    ALTER TABLE tonics DROP country
''')

sqls.append('''
    ALTER TABLE gins
    ADD country_id smallint;
''')
sqls.append('''
    UPDATE gins SET country_id = countries.id FROM countries JOIN gins g on g.country=countries.nicename
''')
sqls.append('''
    ALTER TABLE gins DROP country
''')

response = redshift_data.batch_execute_statement(
    ClusterIdentifier=CLUSTER_ID,
    Database=DB_NAME,
    DbUser=DB_USER,
    Sqls=sqls,
    )

redshift_get_statement_result_to_dataframe(response['Id'])

# Delete Redshift cluster

In [None]:
redshift.delete_cluster( ClusterIdentifier=CLUSTER_ID,  SkipFinalClusterSnapshot=True)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_ID)['Clusters'][0]
print(myClusterProps)

# Delete role

In [None]:
iam.detach_role_policy(RoleName=ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=ROLE_NAME)