# Building ETL Pipeline
---

### Installing necessary Python packages
---

In [None]:
pip install --upgrade sodapy

In [None]:
pip install --upgrade db-dtypes

In [None]:
pip install --upgrade pyarrow

In [None]:
pip install --upgrade google-cloud-bigquery

### Part 1: Setting up 311 Calls NYC Open Data variables
---

In [None]:
# import libraries
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account
# suppressing warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
# setting up host name for the API endpoint
data_url = 'data.cityofnewyork.us'

In [None]:
# setting up the 311 call data set at the API endpoint
data_set = 'erm2-nwe9'

In [None]:
# Setting up  App Token, which you created in Week 6
app_token = 'IicNS1Wbw4TLdLPrHkKPeKAyb'

In [None]:
# creates the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

### Setting up Google BigQuery variables
---

In [None]:
# CHANGE THIS TO YOUR FILE PATH
key_path = r'cis9440-340717-2ea1188a1979.json'

In [None]:
# run this cell without changing anything to setup your credentials
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

In [None]:
dataset_id = 'cis9440-340717.final_project_etl'
dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

### Extracting Data From 311 Calls Dataset
---

In [None]:
# Get the total number of records in the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count}")

In [None]:
# Get the total number of records in target data set
target_record_count = nyc_open_data_client.get(data_set,
                                               where = "created_date >= '2021-01-01' and created_date <= '2022-04-30'",
                                               select = "COUNT(*)")
print(f"target records in {data_set}: {target_record_count}")

In [None]:
def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 20000  # fetch 20000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records sta1rting at 'start'
        results.extend(nyc_open_data_client.get(data_set,
                                                where = "created_date >= '2021-01-01' and created_date <= '2022-04-30'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

data = pull_data_in_chunks(target_record_count)

### Profiling data
---

In [None]:
# listing columns in dataframe
data.columns

In [None]:
# dropping location column
data.drop(["location"], axis = 1, inplace = True)

In [None]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = (data[column].shape[0] - data[column].isna().sum()) - len(data[column].unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].shape[0] - data[column].isna().sum()
            info_dict["percent_null"] = round((data[column].isna().sum()) / (data[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    print(f"data profiling complete, shape of df: {data_profiling_df.shape}")
    return data_profiling_df

data_profiling_df = create_data_profiling_df(data)

In [None]:
# view your data profiling dataframe
data_profiling_df

### Data Cleaning
---

In [None]:
# Run this to look at a list of your columns
data.info()

In [None]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["unique_key",
               "closed_date",
               "street_name",
               "location_type", 
               "incident_zip",
               "incident_address", 
               "cross_street_1",
               "cross_street_2",
               "intersection_street_1",
               "intersection_street_2",
               "landmark",
               "address_type",
               "city", 
               "bbl",
               "x_coordinate_state_plane",
               "y_coordinate_state_plane",
               "open_data_channel_type",
               "park_facility_name",
               "park_borough",
               "latitude",
               "longitude", 
               "facility_type", 
               "bridge_highway_name",
               "bridge_highway_segment",
               "bridge_highway_direction",
               "taxi_company_borough",
               "taxi_pick_up_location",
               "road_ramp",
               "vehicle_type",
               "due_date"]

for column in drop_columns:
    try:
        data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data.columns}")

In [None]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(data[data.duplicated()])}")

In [None]:
# drop duplicate rows based on entire row
data = data.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(data)}")

In [None]:
# drop NaN rows based on entire row
data = data.dropna()
print(f"number of rows after NaN dropped: {len(data)}")

### Creating Location Dimension (dim_location)
---

In [None]:
# first, copy the entire table
dim_location = data.copy()

In [None]:
dim_location.columns

In [None]:
# second, subset for only the wanted columns in the dimension
dim_location = dim_location[["community_board",
                             "borough"]]

#dim_location = dim_location[~dim_location['community_board'].isin(['Unspecified'])]
dim_location = dim_location[dim_location["community_board"].str.contains("Unspecified")==False]

In [None]:
# third, drop duplicate rows in dimension
dim_location = dim_location.drop_duplicates(subset = ["community_board"], keep = 'first')
dim_location = dim_location.reset_index(drop = True)

dim_location = dim_location.sort_values(['borough', 'community_board'], ascending=[False, False])
dim_location = dim_location.reindex(index=dim_location.index[::-1])

#dim_location = dim_location.sort_values("borough", ascending=True)

In [None]:
# fourth, add location_id as a surrogate key
dim_location.insert(0, 'location_id', range(1000, 1000 + len(dim_location)))
dim_location.head()

In [None]:
# fifth, add the location_id to the data table
data = data.merge(dim_location[['community_board', 'location_id']],
                  left_on = 'community_board',
                  right_on = 'community_board',
                  how = 'left')

data.head(2)

In [None]:
# drop NaN rows based on entire row
data = data.dropna()

### Creating Complaint Dimension (dim_complaint) 
---

In [None]:
# first, copy the entire table
dim_complaint = data.copy()

In [None]:
dim_complaint.columns

In [None]:
# second, subset for only the wanted columns in the dimension
dim_complaint = dim_complaint[["descriptor", 
                               "complaint_type"]]

In [None]:
# third, drop duplicate rows in dimension
dim_complaint = dim_complaint.drop_duplicates(subset = ["descriptor"], keep = 'first')
dim_complaint = dim_complaint.reset_index(drop = True)
dim_complaint.head()

In [None]:
# fourth, add complaint_id as a surrogate key
dim_complaint.insert(0, 'complaint_id', range(10, 10 + len(dim_complaint)))
dim_complaint.head()

In [None]:
# fifth, add the complaint_id to the Fact table
data = data.merge(dim_complaint[['descriptor', 'complaint_id']],
                  left_on = 'descriptor',
                  right_on = 'descriptor',
                  how = 'left')

data.head(2)

In [None]:
# drop NaN rows based on entire row
data = data.dropna()
# # converting new column to int object type
# data["complaint_id"] = data["complaint_id"].astype(int)

### Creating Date Dimension (dim_date) 
---

In [None]:
## ACTION REQUIRED: update the start and end date at the bottom of the sql_query variable to fit needs

sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2021-01-01', '2022-04-30', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
dim_date = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(dim_date) > 0:
    print(f"date dimension created successfully, shape of dimension: {dim_date.shape}")
else:
    print("date dimension FAILED")

In [None]:
# create date_id column in the Fact Table
data['date_id'] = data['created_date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

In [None]:
# drop NaN rows based on entire row
data = data.dropna()
# # converting new column to int object type
# dim_date["date_id"] = dim_date["date_id"].astype(int)
# data["date_id"] = data["date_id"].astype(int)

### Creating Request Status Dimension (dim_request_status)
---

In [None]:
# first, copy the entire table
dim_request_status = data.copy()

In [None]:
dim_request_status.columns

In [None]:
# second, subset for only the wanted columns in the dimension
dim_request_status = dim_request_status[["status", 
                                         "resolution_description",
                                         "resolution_action_updated_date"]]

In [None]:
# third, drop duplicate rows in dimension
dim_request_status = dim_request_status.drop_duplicates(subset = ["status"], keep = 'first')
dim_request_status = dim_request_status.reset_index(drop = True)
dim_request_status.head()

In [None]:
# fourth, add status_id as a surrogate key
dim_request_status.insert(0, 'status_id', range(10, 10 + len(dim_request_status)))
dim_request_status.head()

In [None]:
# fifth, add the status_id to the Fact table
data = data.merge(dim_request_status[['status', 'status_id']],
                  left_on = 'status',
                  right_on = 'status',
                  how = 'left')

data.head(2)

In [None]:
# drop NaN rows based on entire row
data = data.dropna()
print(f"number of rows after NaN dropped: {len(data)}")
# converting new column to int object type
data["status_id"] = data["status_id"].astype(int)

### Creating Agency Dimension (dim_agency)
---

In [None]:
# first, copy the entire table
dim_agency = data.copy()

In [None]:
dim_agency.columns

In [None]:
# second, subset for only the wanted columns in the dimension
dim_agency = dim_agency[["agency", 
                         "agency_name"]]

In [None]:
# third, drop duplicate rows in dimension
dim_agency = dim_agency.drop_duplicates(subset = ["agency"], keep = 'first')
dim_agency = dim_agency.reset_index(drop = True)
dim_agency.head()

In [None]:
# fourth, add agency_id as a surrogate key
dim_agency.insert(0, 'agency_id', range(10, 10 + len(dim_agency)))
dim_agency.head()

In [None]:
# fifth, add the agency_id to the Fact table
data = data.merge(dim_agency[['agency', 'agency_id']],
                  left_on = 'agency',
                  right_on = 'agency',
                  how = 'left')

data.head(2)

In [None]:
# # drop NaN rows based on entire row
# data = data.dropna()
# print(f"number of rows after NaN dropped: {len(data)}")
# # converting new column to int object type
# data["agency_id"] = data["agency_id"].astype(int)

### Creating fct_311_calls
---

In [None]:
# Creating 311 Fact Table

# creating a copy of the data table
fct_311_calls = data.copy()

In [None]:
# histogram of days_open
# fct_311_calls["borough"].hist(bins = 5)

In [None]:
# take a subset of fact_table for only the needed columns: which are keys and measures
fct_311_calls = fct_311_calls[["date_id",
                               "complaint_id",
                               "location_id",
                               "status_id",
                               "agency_id"]]

fct_311_calls.head()

In [None]:
# drop NaN rows based on entire row
fct_311_calls = fct_311_calls.dropna()
print(f"number of rows after NaN dropped: {len(data)}")
# converting new column to int object type
fct_311_calls["location_id"] = fct_311_calls["location_id"].astype(int)

### Delivering Fact and Dimensions to Data Warehouse (BigQuery)
---

In [None]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"Starting job {load_job}")

In [None]:
# # loading location dimension to BigQuery

# load_table_to_bigquery(df = dim_location,
#                       table_name = "dim_location",
#                       dataset_id = dataset_id)

In [None]:
# loading date dimension to BigQuery

load_table_to_bigquery(df = dim_date,
                      table_name = "dim_date",
                      dataset_id = dataset_id)

In [None]:
# loading complaint dimension to BigQuery

load_table_to_bigquery(df = dim_complaint,
                      table_name = "dim_complaint",
                      dataset_id = dataset_id)

In [None]:
# loading request status dimension to BigQuery

load_table_to_bigquery(df = dim_request_status,
                      table_name = "dim_request_status",
                      dataset_id = dataset_id)

In [None]:
# loading agency dimension to BigQuery

load_table_to_bigquery(df = dim_agency,
                      table_name = "dim_agency",
                      dataset_id = dataset_id)

In [None]:
# loading 311 calls fact to BigQuery

load_table_to_bigquery(df = fct_311_calls,
                      table_name = "fct_311_calls",
                      dataset_id = dataset_id)

### Part 2: Setting up Community District NYC Open Data variables
---

In [None]:
# setting up the 311 call data set at the API endpoint
data_set = 'jp9i-3b7y'

### Extracting Data From Community District Dataset
---

In [None]:
# Get the total number of records in the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count}")

In [None]:
# Get the total number of records in target data set
target_record_count = nyc_open_data_client.get(data_set,
                                               select = "COUNT(*)")
print(f"target records in {data_set}: {target_record_count}")

In [None]:
def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 200000  # fetch 200000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records sta1rting at 'start'
        results.extend(nyc_open_data_client.get(data_set,
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    data2 = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data2.shape}")
    return data2

data2 = pull_data_in_chunks(target_record_count)

### Profiling Data
---

In [None]:
# listing columns in dataframe
data2.columns

In [None]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data2):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data2.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data2[column].dtypes
            info_dict["unique_values"] = len(data2[column].unique())
            info_dict["duplicate_values"] = (data2[column].shape[0] - data2[column].isna().sum()) - len(data2[column].unique())
            info_dict["null_values"] = data2[column].isna().sum()
            info_dict["non_null_values"] = data2[column].shape[0] - data2[column].isna().sum()
            info_dict["percent_null"] = round((data2[column].isna().sum()) / (data2[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    print(f"data profiling complete, shape of df: {data_profiling_df.shape}")
    return data_profiling_df

data_profiling_df = create_data_profiling_df(data2)

In [None]:
# view your data profiling dataframe
data_profiling_df

### Data Cleaning
---

In [None]:
# Run this to look at a list of your columns
data2.info()

In [None]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["shape_leng",
                "shape_area"]

for column in drop_columns:
    try:
        data2.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data2.columns}")

### Creating Location Dimension (dim_location)
---

In [None]:
# first, copy the entire table
dim_location = data2.copy()

In [None]:
dim_location.columns

In [None]:
dim_location["boro_cd"] = dim_location["boro_cd"].astype(int)

In [None]:
# assign borough namde to community district number
def borough_name(boro_cd):
    if boro_cd >= 100 and boro_cd <= 199:
        return "Manhattan"
    elif boro_cd >= 200 and boro_cd <= 299:
        return "Bronx"
    elif boro_cd >= 300 and boro_cd <= 399:
        return "Brooklyn"
    elif boro_cd >= 400 and boro_cd <= 499:
        return "Queens"
    elif boro_cd >= 500 and boro_cd <= 599:
        return "Staten Island"
    
# create a new column based on condition
dim_location['borough'] = dim_location['boro_cd'].apply(borough_name)

# display the dataframe
dim_location

In [None]:
# sort dataframe by borough in alphabetical order
dim_location = dim_location.sort_values('borough', ascending=False)
dim_location = dim_location.reindex(index=dim_location.index[::-1])

In [None]:
# add location_id as a surrogate key
dim_location.insert(0, "location_id", range(1000, 1000 + len(dim_location)))

In [None]:
data2["boro_cd"] = data2["boro_cd"].astype(int) 
dim_location["location_id"] = dim_location["location_id"].astype(int)

In [None]:
# adding location_id to the data table
data2 = data2.merge(dim_location[['boro_cd', 'location_id']],
                  left_on = 'boro_cd',
                  right_on = 'boro_cd',
                  how = 'left')

data2.head(2)

In [None]:
dim_location = dim_location[["location_id",
                             "boro_cd",
                             "borough"]]

### Creating fct_community_district
---

In [None]:
# Creating Community District Fact Table

# creating a copy of the data table
fct_community_district = data2[["location_id",
                               "boro_cd"]]

In [None]:
fct_community_district.boro_cd = fct_community_district.boro_cd.astype(str)

### Delivering Fact and Dimensions to Data Warehouse (BigQuery)
---

In [None]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"Starting job {load_job}")

In [None]:
# loading location dimension to BigQuery

load_table_to_bigquery(df = dim_location,
                      table_name = "dim_location",
                      dataset_id = dataset_id)

In [None]:
# loading community district fact to BigQuery

load_table_to_bigquery(df = fct_community_district,
                      table_name = "fct_community_district",
                      dataset_id = dataset_id)