In [1]:
# Import the dependencies.

# ETL
# ---
import pandas as pd
import numpy as np
import requests
import io
from pathlib import Path
import datetime as dt

# DB Backend
# -----------
from sqlalchemy import create_engine, event, Engine, ForeignKey, Column, Integer, Float, String, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base, relationship
from sqlalchemy.ext.automap import automap_base

# Flask API Hosting
# -----------------
from flask import Flask, jsonify, request, abort

In [2]:
# ETL
# ----
# -----

# EXTRACTION (LIGHT TRANSFORMATION PERFORMED DURING EXTRACTION)
#--------------------------------------------------------------

# API Call/Avoiding API if file already exists --> DF creation

rawFile = Path('../csv_files/raw/dohmh_original.csv')
if (Path.exists(rawFile)):
    main_df = pd.read_csv(rawFile)
else:
    # Build select statement with aliases
    q_select = (
        'camis AS id,'
        'dba AS name,'
        'boro AS borough,'
        'cuisine_description AS cuisine,'
        'inspection_date,'
        'latitude AS lat,'
        'longitude AS lng'
    )

    # Build filters for date (within 2 years) and no nulls for cuisine, lat, or lng
    dateLimit = (dt.datetime.now() - dt.timedelta(days = 2 * 365)).isoformat()
    filter_dt = f'inspection_date > "{dateLimit}"'
    notNull = 'IS NOT NULL'
    filter_NA = \
        f'cuisine {notNull} AND lat {notNull} AND lng {notNull}'
    
    # Init full filters for API call with limit
    q_where = f'{filter_dt} AND {filter_NA}'
    q_limit = 200000
    
    # Base URL
    url = 'https://data.cityofnewyork.us/resource/43nn-pn8j.csv'

    # Parameters to send with API Call
    params = {
        '$select': q_select,
        '$where': q_where,
        '$limit': q_limit
    }
    # API Call itself using socrata (SODA) querying
    response = requests.get(url, params)

    # Using io.StringIO to create pseudo CSV file for export and reading
    csv = io.StringIO(response.content.decode('utf-8'))
    main_df = pd.read_csv(csv)

    # MINOR LOADING OF ORIGINAL DATASET RETRIEVED
    main_df.to_csv(rawFile, header = True, index = False)


# TRANSFORMATION
# --------------

# Correcting date type --> datetime (doesn't need times or tz info)
main_df['inspection_date'] = pd.to_datetime(main_df['inspection_date'])

# Groupy by to resolve outdated records (grab most recent ones only)
uniqueLocs = main_df.groupby('id')['inspection_date'].max().reset_index(drop = False)
main_df = uniqueLocs.merge(main_df, how = 'left').copy()

# Multiple most recent records per id so drop exact duplicates
main_df = main_df.drop_duplicates(keep = 'last')

# Reorder to correct columns
main_df = main_df[
    ['id', 'name', 'borough', 'cuisine', 'inspection_date', 'lat', 'lng']
].reset_index(drop = True)

cleanFile = Path('../csv_files/clean/dohmh_clean.csv')
main_df.to_csv(cleanFile, header = True, index = False)



In [None]:


# Removing Fast Food Restaurants from DF
fastFood_df = pd.read_csv('/Users/neelagarwal/Desktop/Datafiniti_Fast_Food_Restaurants.csv')
fastFood_list = [*fastFood_df['name'].unique()]

main_df = main_df.loc[~main_df['name'].isin(fastFood_list)]


# Data Normalization

# Normalizing borough name
boros = ['Manhattan', 'Bronx', 'Brooklyn', 'Queens', 'Staten Island']
boro_dict = {boro : f'B{num}' for num, boro in enumerate(boros, start = 1)}

# DF to hold new borough table
boro_df = pd.DataFrame(
    {
        'borough_id': boro_dict.values(),
        'borough': boro_dict.keys()
    }
)

# Mapping borough names to new id
main_df['borough'] = main_df['borough'].map(boro_dict)


# Normalizing cuisine type and dropping eroneous types
dropList = [
    'Bakery Products/Desserts', 'Sandwiches', 'Frozen Desserts', 'Hotdogs', 'Donuts', 'Other',
    'Coffee/Tea', 'Seafood', 'Bottled Beverages', 'Hamburgers', 'Chicken', 
    'Bagels/Pretzels', 'Pancakes/Waffles', 'Vegetarian', 'Juice, Smoothies, Fruit Salads',
    'Fusion', 'Salads', 'Sandwiches/Salads/Mixed Buffet', 'Hotdogs/Pretzels', 
    'Vegan', 'Californian', 'Soups/Salads/Sandwiches', 'Soups', 'Fruits/Vegetables', 
    'Nuts/Confectionary', 'Not Listed/Not Applicable', 'Chimichurri'
]
main_df = main_df[~main_df['cuisine'].isin(dropList)]

cuisines = main_df['cuisine'].unique()
cuisines_dict = {cuisine : f'C{id}' for id, cuisine in enumerate(cuisines, start = 1)}

# DF to hold new cuisine table
cuisine_df = pd.DataFrame(
    {
        'cuisine_id': cuisines_dict.values(),
        'cuisine': cuisines_dict.keys()
    }
)

# Mapping cuisine types to new ID
main_df['cuisine'] = main_df['cuisine'].map(cuisines_dict)

# Renaming for Normalization
main_df = main_df.rename(
    columns = {
        'borough': 'borough_id',
        'cuisine': 'cuisine_id'
    }
)

In [None]:
# SQLITE BACKEND DEV

# Create ORM base var
Base = declarative_base()

# Secondary ref table for boroughs
class Boroughs(Base):
    __tablename__ = 'boroughs'

    borough_id = Column(String, primary_key = True)
    borough = Column(String, nullable = False)

# Secondary ref table for cuisines
class Cuisines(Base):
    __tablename__ = 'cuisines'

    cuisine_id = Column(String, primary_key = True)
    cuisine = Column(String, nullable = False)

# Main Table (Restaurant)
class Restaurants(Base):
    __tablename__ = 'restaurants'

    id = Column(Integer, primary_key = True)
    name = Column(String, nullable = False, unique = True)
    borough_id = Column(String, ForeignKey('boroughs.borough_id'), nullable = False)
    cuisine_id = Column(String, ForeignKey('cuisines.cuisine_id'), nullable = False)
    inspection_date = \
        Column(DateTime, nullable = False)
    lat = Column(Float, nullable = False)
    lng = Column(Float, nullable = False)

    borough = relationship('boroughs', backref = 'restaurants')
    cuisine = relationship('cuisines', backref = 'restaurants')

    def __repr__(self):
        f'<RestaurantTable(id={self.id}, name="{self.name}")>'


# Create engine, bind sessions to it, and create tables
engine = create_engine('sqlite:///courier.sqlite')

@event.listens_for(Engine, 'connect')
def enforce_sqlite_fks(dbapi, conn_record):
    cursor = dbapi.cursor()
    cursor.execute('PRAGMA foreign_keys=ON;')
    cursor.close()

Session = sessionmaker(bind = engine)
Base.metadata.create_all(engine)