# Extract, Transform, Load (ETL)
---
The purpose of this Jupyter Notebook is to extract data and storing it in a SQLite database within this package. The codes below show the steps in this process:
- Data pulling ['Starbucks', 'Think Coffee', 'Joe Coffee', 'Gregorys Coffee', 'Birch Coffee']
- ETL - for only needed data that will be used to store in the database
- Configure database
    - Delete (if exists) and create a new database coffee_chains.sqlite and the table
    - Load data into the database

In [1]:
# Import dependencies
from api_key import api_key
from jsonschema import validate
import json
import requests
import pandas as pd

In [2]:
# Set up Yelp API constants
API_HOST = 'https://api.yelp.com/v3/businesses/search'
HEADERS = {
    'Authorization': 'bearer %s' % api_key
}

# Schema for comparing in checking before extraction
schema = {
    'alias': '',
    'categories': [],
    'coordinates': {},
    'display_phone': '',
    'distance': 0.00,
    'id': 'string',
    'image_url': '',
    'is_closed': True,
    'location': {},
    'name': '',
    'phone': '',
    'price': '',
    'rating': 0.0,
    'review_count': 0,
    'transaction': [],
    'url': ''
}

### Custom functions for the ETL steps

In [3]:
# Simple request function for bussiness search endpoint from Yelp API
def request(term = '', loc = ''):
    data = []
    
    for offset in range(0, 200, 50):
        params = {
            'term': term.replace(' ', '+'),
            'location': loc.replace(' ', '+'),
            'limit': 50,
            'offset': offset
        }

        # Send the request
        response = requests.get(API_HOST, headers = HEADERS, params = params)

        # Verify the response and return None if error returned else return the json data
        if response.status_code == 200:
            data += response.json()['businesses']
        else:
            return None
    
    return data

# Function to verify the predefined schema on what we should be expecting before extracting
def verify_schema(data = None):
    
    # Verify the object entered before extraction
    if data == None:
        return False
    elif not isinstance(data, dict):
        return False
    else:
        try:
            validate(instance=data, schema=schema)
            return True
        except:
            return False

# Return the json to DF for cleanining
def json_to_dataframe(data = None):
    try:
        if not verify_schema(data[0]):
            return pd.DataFrame({'error': ["{'error': 'SCHEMA VALIDATION ERROR'}"]})
        else:
            return pd.DataFrame(data)
    except TypeError:
        return pd.DataFrame({'error': ["{'error': 'OBJECT INPUT ERROR'}"]})

# Function to extract the id, name, price, rating, review_count, location (address 1, address 2, address 3, city, 
# state, zip_code), coordinates (latitude and longtitude), and phone into a DataFrame
def cleaned_yelp_dataframe(df, name):
    # Create a copy of the df to work with
    clean_df = df.copy()
    
    # Sometimes random result return not matching the criteria, filter those out by the name
    key_search = [name.lower()]
    clean_df['name'] = clean_df.name.apply(lambda x: x.lower())
    clean_df = clean_df.loc[clean_df['name'].isin(key_search)]
    
    # Normalizing the coordinates and location columns with nested dictionary
    clean_df[['latitude', 'longitude']] = pd.json_normalize(clean_df['coordinates'])
    clean_df[[
        'address1', 
        'address2', 
        'address3', 
        'city', 
        'zip', 
        'country', 
        'state', 
        'display_address'
    ]] = pd.json_normalize(clean_df['location'])
    
    # Drop off the columns no longer needed
    clean_df = clean_df.drop(columns = [
        'alias', 'image_url', 'is_closed', 'url', 'categories', 'coordinates', 'transactions', 'location',
        'phone', 'distance', 'display_address'        
    ])
    
    # Add price point column, fill na, and convert to int
    clean_df['price'] = clean_df['price'].fillna('')
    clean_df['price_point'] = clean_df['price'].str.len()
    clean_df['price_point'] = clean_df['price_point'].fillna(0)
    clean_df['price_point'] = clean_df['price_point'].astype('int')
    
    # Reorganize the df for easy viewing
    clean_df = clean_df[[
        'id', 'name', 'review_count', 'rating', 'price', 'price_point', 'display_phone', 'address1', 'address2', 'address3',
        'city', 'state', 'zip', 'country', 'latitude', 'longitude'
    ]]
    
    # Filter out just stores found within New York or Brooklyn cities
    key_search = ['New York', 'Brooklyn']
    clean_df = clean_df.loc[clean_df['city'].isin(key_search)]
    
    # Replace any "None" values from address2 and address3 to ""
    clean_df['address2'] = clean_df['address2'].fillna('')
    clean_df['address3'] = clean_df['address3'].fillna('')
    
    # Proper casing for the name and city
    clean_df['name'] = clean_df['name'].str.title()

    # Take only rows that do not have nan for coordinates
    clean_df = clean_df[clean_df['latitude'].notna()]

    return clean_df

### Perform the ETL into the database

In [4]:
# Using for loop to pull the requests, clean, and then combine to export out into one single json file and add to database
coffee_chains = ['Starbucks', 'Dunkin\' Donuts', 'Tim Hortons', 'Think Coffee', 'Joe Coffee', 'Gregorys Coffee', 'Birch Coffee']
dfs = []

# Loop through to append the output cleaned df of each coffee chain for the merging using the defined functions from above
for shop in coffee_chains:
    coffee_data = request(shop, 'nyc')
    coffee_data = json_to_dataframe(coffee_data)
    cleaned_coffee_data = cleaned_yelp_dataframe(coffee_data, shop)
    dfs.append(cleaned_coffee_data)

# Now perform the merge
df_merged = pd.concat(dfs)

# Export out into csv and json in addition to adding it to a SQLite database
df_merged.to_csv('../static/dataset/merged.csv', index = False)
df_merged.to_json('../static/dataset/merged.json', orient = 'records', indent = 4)

In [5]:
# Quick check/reveview the merged DF
df_merged.head()

Unnamed: 0,id,name,review_count,rating,price,price_point,display_phone,address1,address2,address3,city,state,zip,country,latitude,longitude
0,qcnoyytlFIuqlcjDXkXJiw,Starbucks,80,2.5,$$,2,(718) 855-0856,67 Main St,,,Brooklyn,NY,11201,US,40.702754,-73.990884
1,60agfQbky4cX8BEApyltIA,Starbucks,36,2.0,$,1,(646) 699-9983,375 Pearl St,,,New York,NY,10038,US,40.71102,-74.00091
3,C0EKm9V9QI2R47QkRWA5aQ,Starbucks,44,3.0,$,1,(929) 955-1841,99 Wall St,,,New York,NY,10005,US,40.704901,-74.007244
4,OuNYQaqEJjkBHRdnC-4HOw,Starbucks,40,2.5,$$,2,(212) 509-9709,100 William St,,,New York,NY,10038,US,40.708413,-74.007387
5,mEMPhPK6dSgy5eXS-kYHLg,Starbucks,68,3.0,$$,2,(718) 243-0455,134 Montague St,,,Brooklyn,NY,11201,US,40.694582,-73.993978


In [6]:
# Import dependencies for handling the database
from os import path, remove
from sqlalchemy import create_engine, text
from sqlalchemy import Column, Integer, String, Float
from sqlalchemy.orm import declarative_base, Session

In [7]:
# Setup the db path
db_path = '../coffee_chains.sqlite'

# Delete the existing database if it exists
if path.exists(db_path):
    remove(db_path)

In [9]:
# Setup the engine with the database and base
engine = create_engine(f'sqlite:///{db_path}')
Base = declarative_base()

# Setup the table to be created if the database is being created
class Shop(Base):
    __tablename__ = 'shops'    
    id = Column(Integer, primary_key = True)
    name = Column(String(200))
    review_count = Column(Integer)
    rating = Column(Float)
    price = Column(String(10))
    price_point = Column(Integer)
    display_phone = Column(String(50))
    address1 = Column(String(200))
    address2 = Column(String(200))
    address3 = Column(String(200))
    city = Column(String(200))
    state = Column(String(100))
    zip = Column(String(50))
    country = Column(String(100))
    latitude = Column(Float)
    longitude = Column(Float)
    
    def __init__(self, name):
        self.name = name

# Create the SQLite database and the table
Base.metadata.create_all(engine)

In [10]:
# Check the table to see if it has been created
print(Base.metadata.tables.keys())

dict_keys(['shops'])


In [11]:
# Append the DF to the shop table created
df_merged.to_sql(name = 'shop',con = engine, if_exists = 'append', index = False)

205

In [12]:
# Quick check to see if things got appended correctly
session = Session(bind = engine)

<sqlalchemy.engine.cursor.CursorResult at 0x133f84bc3d0>

In [13]:
session.execute(text('SELECT * FROM shop')).fetchone()

('qcnoyytlFIuqlcjDXkXJiw', 'Starbucks', 80, 2.5, '$$', 2, '(718) 855-0856', '67 Main St', '', '', 'Brooklyn', 'NY', '11201', 'US', 40.70275444, -73.99088374)

In [14]:
# Close out of the session and engine
session.close()
engine.dispose()