## STEP 1:

In [None]:
pip install --upgrade sodapy

In [None]:
pip install --upgrade db-dtypes

In [None]:
pip install --upgrade pyarrow

In [None]:
pip install --upgrade google-cloud-bigquery

In [1]:
# import libraries
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

## STEP 2:

### drinking water fact and dimensions

##### step 2.1

In [2]:
# setup the host name for the API endpoint 
data_url = 'data.cityofnewyork.us'

In [3]:
# https://data.cityofnewyork.us/resource/bkwf-xfky.json
drinking_water = 'bkwf-xfky'

In [4]:
# Setup App Token
app_token = '68xD3x8f4S30r1Df8zIDK2VZ4'

In [5]:
# setup Socrata client that connects python to NYC Open Data
# create the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

nyc open data client name is: <sodapy.socrata.Socrata object at 0x7f91d2ebc310>
nyc open data client data type is: <class 'sodapy.socrata.Socrata'>


##### step 2.2

In [6]:
key_path = r'cis9440-340719-37a24169420f.json'

In [7]:
#setup credentials
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x7f91d2ebc5e0>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


In [8]:
dataset_id = 'cis9440-340719.etl_dataset' 

dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: cis9440-340719.etl_dataset


## STEP 3

In [9]:
# Get the total number of records in the entire data set
total_record_count = nyc_open_data_client.get(drinking_water, select = "COUNT(*)")
print(f"total records in {drinking_water}: {total_record_count}")

total records in bkwf-xfky: [{'COUNT': '112219'}]


In [10]:
# Get the total number of records in target data set

target_record_count = nyc_open_data_client.get(drinking_water,
                                               where = "sample_date > '2019-01-01'",
                                               select= "COUNT(*)")
print(f"target records in {drinking_water}: {target_record_count}")

target records in bkwf-xfky: [{'COUNT': '48786'}]


In [11]:
#loop through target data set to pull all rows in chunks 

def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_client.get(drinking_water,
                                                where = "sample_date > '2019-01-01'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    drinking_water_data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {drinking_water_data.shape}")
    return drinking_water_data

drinking_water_data = pull_data_in_chunks(target_record_count)

function took 7.0 seconds
the shape of your dataframe is: (48786, 10)


In [12]:
drinking_water_data.head()

Unnamed: 0,sample_number,sample_date,sample_time,sample_site,sample_class,residual_free_chlorine_mg_l,turbidity_ntu,fluoride_mg_l,coliform_quanti_tray_mpn_100ml,e_coli_quanti_tray_mpn_100ml
0,201900113,2019-01-02T00:00:00.000,09:39,31650,Compliance,0.31,0.55,0.68,<1,<1
1,201900114,2019-01-02T00:00:00.000,08:59,33550,Compliance,0.25,0.52,0.68,<1,<1
2,201900115,2019-01-02T00:00:00.000,10:36,37450,Compliance,0.4,0.75,0.71,<1,<1
3,201900116,2019-01-02T00:00:00.000,11:21,3SC26,Operational,0.64,<0.10,0.7,<1,<1
4,201900119,2019-01-02T00:00:00.000,11:23,50200,Operational,0.77,0.66,,<1,<1


## STEP 4

### data profiling

In [13]:
# what are the columns in dataframe?
drinking_water_data.columns

Index(['sample_number', 'sample_date', 'sample_time', 'sample_site',
       'sample_class', 'residual_free_chlorine_mg_l', 'turbidity_ntu',
       'fluoride_mg_l', 'coliform_quanti_tray_mpn_100ml',
       'e_coli_quanti_tray_mpn_100ml'],
      dtype='object')

In [14]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = (data[column].shape[0] - data[column].isna().sum()) - len(data[column].unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].shape[0] - data[column].isna().sum()
            info_dict["percent_null"] = round((data[column].isna().sum()) / (data[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    return data_profiling_df

drinking_water_data_profiling_df = create_data_profiling_df(drinking_water_data)

In [15]:
drinking_water_data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values,percent_null
0,sample_number,object,48786,0,0,48786,0.0
1,sample_date,object,1155,47631,0,48786,0.0
2,sample_time,object,471,48315,0,48786,0.0
3,sample_site,object,393,48393,0,48786,0.0
6,turbidity_ntu,object,208,48578,0,48786,0.0
5,residual_free_chlorine_mg_l,object,140,48644,2,48784,0.0
7,fluoride_mg_l,object,47,6378,42361,6425,0.868
8,coliform_quanti_tray_mpn_100ml,object,40,48733,13,48773,0.0
4,sample_class,object,4,48782,0,48786,0.0
9,e_coli_quanti_tray_mpn_100ml,object,4,48769,13,48773,0.0


In [16]:
drinking_water_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 48786 entries, 0 to 48785
Data columns (total 10 columns):
 #   Column                          Non-Null Count  Dtype 
---  ------                          --------------  ----- 
 0   sample_number                   48786 non-null  object
 1   sample_date                     48786 non-null  object
 2   sample_time                     48786 non-null  object
 3   sample_site                     48786 non-null  object
 4   sample_class                    48786 non-null  object
 5   residual_free_chlorine_mg_l     48784 non-null  object
 6   turbidity_ntu                   48786 non-null  object
 7   fluoride_mg_l                   6425 non-null   object
 8   coliform_quanti_tray_mpn_100ml  48773 non-null  object
 9   e_coli_quanti_tray_mpn_100ml    48773 non-null  object
dtypes: object(10)
memory usage: 3.7+ MB


In [17]:
drop_columns = ['sample_class','sample_site','coliform_quanti_tray_mpn_100ml','fluoride_mg_l']

for column in drop_columns:
    try:
        drinking_water_data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {drinking_water_data.columns}")

columns left in dataframe: Index(['sample_number', 'sample_date', 'sample_time',
       'residual_free_chlorine_mg_l', 'turbidity_ntu',
       'e_coli_quanti_tray_mpn_100ml'],
      dtype='object')


In [18]:
print(f"number of duplicate rows: {len(drinking_water_data[drinking_water_data.duplicated()])}")

number of duplicate rows: 0


### change data type

In [19]:
drinking_water_data['turbidity_ntu'].unique()

array(['0.55', '0.52', '0.75', '<0.10', '0.66', '0.65', '0.63', '0.71',
       '0.86', '0.81', '0.73', '0.46', '0.83', '0.72', '0.67', '0.68',
       '0.7', '0.74', '0.6', '0.78', '0.25', '0.82', '0.57', '0.58',
       '0.64', '0.49', '0.89', '0.61', '0.47', '0.76', '0.4', '0.43',
       '0.54', '0.8', '0.69', '0.56', '0.59', '0.18', '0.88', '0.15',
       '0.38', '0.11', '0.62', '1.23', '0.77', '0.87', '0.79', '0.1',
       '0.44', '0.51', '0.9', '0.5', '0.85', '0.96', '0.14', '0.48',
       '1.22', '0.13', '0.45', '0.39', '0.34', '0.31', '0.12', '0.35',
       '0.3', '0.27', '0.36', '0.41', '1.12', '0.26', '0.33', '0.37',
       '0.32', '0.22', '0.92', '0.24', '0.42', '1.72', '0.53', '0.23',
       '0.21', '0.19', '0.28', '0.29', '0.84', '1.24', '1.02', '1.08',
       '1.03', '1.11', '0.99', '1', '1.01', '1.07', '0.97', '1.05',
       '1.14', '0.95', '0.91', '0.93', '0.94', '0.98', '0.16', '0.2',
       '2.33', '0.17', '1.2', '1.06', '1.13', '1.09', '1.04', '1.1',
       '1.18', '1.1

In [20]:
drinking_water_data.loc[drinking_water_data['turbidity_ntu']=='<0.10','turbidity_ntu']=0

In [21]:
drinking_water_data['residual_free_chlorine_mg_l'].unique()

array(['0.31', '0.25', '0.4', '0.64', '0.77', '0.82', '0.65', '0.81',
       '0.83', '0.93', '0.86', '0.54', '0.89', '0.76', '0.36', '0.37',
       '0.45', '0.49', '0.63', '0.34', '0.73', '0.67', '0.57', '0.58',
       '0.69', '0.68', '0.7', '0.87', '0.75', '0.55', '0.71', '0.26',
       '0.51', '0.44', '0.62', '0.59', '0.72', '0.5', '0.38', '0.66',
       '0.88', '0.6', '0.56', '0.96', '0.52', '0.9', '0.78', '0.91',
       '0.39', '0.42', '0.2', '0.48', '0.97', '0.74', '0.28', '0.61',
       '0.23', '0.46', '0.79', '0.84', '0.92', '0.8', '0.3', '0.33',
       '0.85', '0.35', '0.47', '0.53', '0.32', '0.29', '0.27', '0.94',
       '1.02', '0.24', '0.41', '0.13', '0.99', '0.43', '0.19', '1.01',
       '0.21', '0.22', '0.12', '0.09', '0.95', '0.16', '0.15', '0.98',
       '0.1', '0.18', '0.14', '0.06', '0.11', '0.02', '0.05', '0.17', '0',
       '0.04', '0.07', '1', '0.08', '1.04', '1.03', '0.03', '1.11',
       '1.09', '1.07', '1.05', '1.12', '1.1', '1.2', '1.15', '1.16',
       '1.06', 

In [22]:
drinking_water_data['e_coli_quanti_tray_mpn_100ml'].unique()

array(['<1', '1', nan, '2'], dtype=object)

In [23]:
drinking_water_data.loc[drinking_water_data['e_coli_quanti_tray_mpn_100ml']=='<1','e_coli_quanti_tray_mpn_100ml']=0

In [24]:
drinking_water_data = drinking_water_data.astype({'turbidity_ntu':'float',
                                                              'residual_free_chlorine_mg_l':'float',
                                                              'e_coli_quanti_tray_mpn_100ml':'float'})

In [25]:
drinking_water_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 48786 entries, 0 to 48785
Data columns (total 6 columns):
 #   Column                        Non-Null Count  Dtype  
---  ------                        --------------  -----  
 0   sample_number                 48786 non-null  object 
 1   sample_date                   48786 non-null  object 
 2   sample_time                   48786 non-null  object 
 3   residual_free_chlorine_mg_l   48784 non-null  float64
 4   turbidity_ntu                 48786 non-null  float64
 5   e_coli_quanti_tray_mpn_100ml  48773 non-null  float64
dtypes: float64(3), object(3)
memory usage: 2.2+ MB


## STEP 5

### date dimension

In [26]:
#FORMAT_DATE('%Q', d) as fiscal_qtr,
sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2019-01-01', '2022-03-01', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(date_dim) > 0:
    print(f"date dimension created successfully, shape of dimension: {date_dim.shape}")
else:
    print("date dimension FAILED")

date dimension created successfully, shape of dimension: (1156, 6)


In [27]:
# create date_id column in the Fact Table
drinking_water_data['date_id'] = drinking_water_data['sample_date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

### sample_dim

In [28]:
sample_dim = drinking_water_data.copy()

In [29]:
sample_dim.columns

Index(['sample_number', 'sample_date', 'sample_time',
       'residual_free_chlorine_mg_l', 'turbidity_ntu',
       'e_coli_quanti_tray_mpn_100ml', 'date_id'],
      dtype='object')

In [30]:
#subset for only the wanted columns in the dimension
sample_dim = sample_dim[["sample_number"]]

In [31]:
sample_dim.count()

sample_number    48786
dtype: int64

In [32]:
#drop duplicate rows in dimension
sample_dim = sample_dim.drop_duplicates(subset = ['sample_number'], keep = 'first')
sample_dim = sample_dim.reset_index(drop = True)
sample_dim.head()

Unnamed: 0,sample_number
0,201900113
1,201900114
2,201900115
3,201900116
4,201900119


In [33]:
sample_dim.count()

sample_number    48786
dtype: int64

In [34]:
#add sample_id as a surrogate key
sample_dim.insert(0, 'sample_id', range(1000, 1000 + len(sample_dim)))
sample_dim.head()

Unnamed: 0,sample_id,sample_number
0,1000,201900113
1,1001,201900114
2,1002,201900115
3,1003,201900116
4,1004,201900119


In [35]:
#add the sample_id to the Fact table
drinking_water_data = drinking_water_data.merge(sample_dim[['sample_number', 'sample_id']],
                  left_on = 'sample_number',
                  right_on = 'sample_number',
                  how = 'left')

drinking_water_data.head(2)

Unnamed: 0,sample_number,sample_date,sample_time,residual_free_chlorine_mg_l,turbidity_ntu,e_coli_quanti_tray_mpn_100ml,date_id,sample_id
0,201900113,2019-01-02T00:00:00.000,09:39,0.31,0.55,0.0,20190102,1000
1,201900114,2019-01-02T00:00:00.000,08:59,0.25,0.52,0.0,20190102,1001


### creating facts

In [36]:
drinking_water_fact_table = drinking_water_data.copy()

In [37]:
drinking_water_fact_table.columns

Index(['sample_number', 'sample_date', 'sample_time',
       'residual_free_chlorine_mg_l', 'turbidity_ntu',
       'e_coli_quanti_tray_mpn_100ml', 'date_id', 'sample_id'],
      dtype='object')

In [38]:
drinking_water_fact_table = drinking_water_fact_table[["date_id",
                         "sample_id",
                         "residual_free_chlorine_mg_l",
                        "turbidity_ntu",
                        "e_coli_quanti_tray_mpn_100ml"]]

drinking_water_fact_table.head()

Unnamed: 0,date_id,sample_id,residual_free_chlorine_mg_l,turbidity_ntu,e_coli_quanti_tray_mpn_100ml
0,20190102,1000,0.31,0.55,0.0
1,20190102,1001,0.25,0.52,0.0
2,20190102,1002,0.4,0.75,0.0
3,20190102,1003,0.64,0.0,0.0
4,20190102,1004,0.77,0.66,0.0


## STEP 6

### Deliver Facts and Dimensions to Data Warehouse (BigQuery)

In [39]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id #change 301800 to match your project id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"Starting job {load_job}")

In [40]:
# load date dimension to BigQuery

load_table_to_bigquery(df = date_dim,
                      table_name = "date_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=575e9661-c167-47da-82b8-d18907d31827>


In [41]:
#load sample dimension to BigQuery
load_table_to_bigquery(df = sample_dim,
                      table_name = "sample_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=dd4110c5-1963-4464-b1c3-17c39959bfa0>


In [42]:
#load fact table to BigQuery
load_table_to_bigquery(df = drinking_water_fact_table,
                      table_name = "drinking_water_fact",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=7947253a-1d68-40e4-a091-11640586b597>


### Tap Water ETL

## STEP 1

In [43]:
#https://data.cityofnewyork.us/resource/k5us-nav4.json
tap_water = 'k5us-nav4'

## STEP 2

In [44]:
total_record_count = nyc_open_data_client.get(tap_water, select = "COUNT(*)")
print(f"total records in {tap_water}: {total_record_count}")

total records in k5us-nav4: [{'COUNT': '26113'}]


In [45]:
target_record_count = nyc_open_data_client.get(tap_water,
                                               where = "date_received > '2019-01-01'",
                                               select= "COUNT(*)")
print(f"target records in {tap_water}: {target_record_count}")

target records in k5us-nav4: [{'COUNT': '11339'}]


In [46]:
def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_client.get(tap_water,
                                                where = "date_received > '2019-01-01'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    tap_water_data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {tap_water_data.shape}")
    return tap_water_data

tap_water_data = pull_data_in_chunks(target_record_count)

function took 1.7 seconds
the shape of your dataframe is: (11339, 11)


## STEP 3

In [47]:
tap_water_data.head()

Unnamed: 0,kit_id,borough,zipcode,date_collected,date_received,lead_first_draw_mg_l,lead_1_2_minute_flush_mg_l,copper_first_draw_mg_l,copper_1_2_minute_flush_mg_l,lead_5_minute_flush_mg_l,copper_5_minute_flush_mg_l
0,18256330,BROOKLYN,11215,2018-12-30T00:00:00.000,2019-01-02T00:00:00.000,0.0,0,0.243,0.032,,
1,18256275,NEW YORK,10024,2018-12-31T00:00:00.000,2019-01-02T00:00:00.000,0.0,0,0.031,0.02,,
2,18255738,NEW YORK,10028,2018-12-29T00:00:00.000,2019-01-02T00:00:00.000,0.0,0,0.166,0.067,,
3,18256310,BRONX,10462,2018-12-29T00:00:00.000,2019-01-02T00:00:00.000,0.0,0,0.275,0.073,,
4,18256200,BRONX,10469,2018-12-30T00:00:00.000,2019-01-02T00:00:00.000,0.001,0,0.314,0.191,,


In [48]:
tap_water_data.columns

Index(['kit_id', 'borough', 'zipcode', 'date_collected', 'date_received',
       'lead_first_draw_mg_l', 'lead_1_2_minute_flush_mg_l',
       'copper_first_draw_mg_l', 'copper_1_2_minute_flush_mg_l',
       'lead_5_minute_flush_mg_l', 'copper_5_minute_flush_mg_l'],
      dtype='object')

In [49]:
tap_water_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11339 entries, 0 to 11338
Data columns (total 11 columns):
 #   Column                        Non-Null Count  Dtype 
---  ------                        --------------  ----- 
 0   kit_id                        11339 non-null  object
 1   borough                       11339 non-null  object
 2   zipcode                       11339 non-null  object
 3   date_collected                11339 non-null  object
 4   date_received                 11339 non-null  object
 5   lead_first_draw_mg_l          11339 non-null  object
 6   lead_1_2_minute_flush_mg_l    11333 non-null  object
 7   copper_first_draw_mg_l        11339 non-null  object
 8   copper_1_2_minute_flush_mg_l  11333 non-null  object
 9   lead_5_minute_flush_mg_l      250 non-null    object
 10  copper_5_minute_flush_mg_l    250 non-null    object
dtypes: object(11)
memory usage: 974.6+ KB


### clean data

In [50]:
tap_water_data['lead_5_minute_flush_mg_l'].isna().sum()

11089

In [51]:
tap_water_data['copper_5_minute_flush_mg_l'].isna().sum()

11089

In [52]:
drop_columns = ['date_collected','lead_5_minute_flush_mg_l','copper_5_minute_flush_mg_l']

for column in drop_columns:
    try:
        tap_water_data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {tap_water_data.columns}")

columns left in dataframe: Index(['kit_id', 'borough', 'zipcode', 'date_received', 'lead_first_draw_mg_l',
       'lead_1_2_minute_flush_mg_l', 'copper_first_draw_mg_l',
       'copper_1_2_minute_flush_mg_l'],
      dtype='object')


In [53]:
print(f"number of duplicate rows: {len(tap_water_data[tap_water_data.duplicated()])}")

number of duplicate rows: 0


In [54]:
tap_water_data.columns

Index(['kit_id', 'borough', 'zipcode', 'date_received', 'lead_first_draw_mg_l',
       'lead_1_2_minute_flush_mg_l', 'copper_first_draw_mg_l',
       'copper_1_2_minute_flush_mg_l'],
      dtype='object')

### change data type

In [55]:
tap_water_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11339 entries, 0 to 11338
Data columns (total 8 columns):
 #   Column                        Non-Null Count  Dtype 
---  ------                        --------------  ----- 
 0   kit_id                        11339 non-null  object
 1   borough                       11339 non-null  object
 2   zipcode                       11339 non-null  object
 3   date_received                 11339 non-null  object
 4   lead_first_draw_mg_l          11339 non-null  object
 5   lead_1_2_minute_flush_mg_l    11333 non-null  object
 6   copper_first_draw_mg_l        11339 non-null  object
 7   copper_1_2_minute_flush_mg_l  11333 non-null  object
dtypes: object(8)
memory usage: 708.8+ KB


In [56]:
tap_water_data = tap_water_data.astype({'lead_first_draw_mg_l':'float',
                                                              'lead_1_2_minute_flush_mg_l':'float',
                                                              'copper_first_draw_mg_l':'float',
                                        'copper_1_2_minute_flush_mg_l':'float'})

## STEP 4

### date dimension

In [57]:
tap_water_data['date_id'] = tap_water_data['date_received'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

### location dimension

In [58]:
location_dim = tap_water_data.copy()

In [59]:
location_dim.columns

Index(['kit_id', 'borough', 'zipcode', 'date_received', 'lead_first_draw_mg_l',
       'lead_1_2_minute_flush_mg_l', 'copper_first_draw_mg_l',
       'copper_1_2_minute_flush_mg_l', 'date_id'],
      dtype='object')

In [60]:
location_dim = location_dim[["borough",
                             "zipcode"]]

In [61]:
location_dim.head()

Unnamed: 0,borough,zipcode
0,BROOKLYN,11215
1,NEW YORK,10024
2,NEW YORK,10028
3,BRONX,10462
4,BRONX,10469


In [62]:
location_dim = location_dim.drop_duplicates(subset = ["zipcode"], keep = 'first')
location_dim = location_dim.reset_index(drop = True)
location_dim.head()

Unnamed: 0,borough,zipcode
0,BROOKLYN,11215
1,NEW YORK,10024
2,NEW YORK,10028
3,BRONX,10462
4,BRONX,10469


In [63]:
location_dim.insert(0, 'location_id', range(5000, 5000 + len(location_dim)))
location_dim.head()

Unnamed: 0,location_id,borough,zipcode
0,5000,BROOKLYN,11215
1,5001,NEW YORK,10024
2,5002,NEW YORK,10028
3,5003,BRONX,10462
4,5004,BRONX,10469


In [64]:
#add the location_id to the data table
tap_water_data = tap_water_data.merge(location_dim[['zipcode', 'location_id']],
                  left_on = 'zipcode',
                  right_on = 'zipcode',
                  how = 'left')

tap_water_data.head(2)

Unnamed: 0,kit_id,borough,zipcode,date_received,lead_first_draw_mg_l,lead_1_2_minute_flush_mg_l,copper_first_draw_mg_l,copper_1_2_minute_flush_mg_l,date_id,location_id
0,18256330,BROOKLYN,11215,2019-01-02T00:00:00.000,0.0,0.0,0.243,0.032,20190102,5000
1,18256275,NEW YORK,10024,2019-01-02T00:00:00.000,0.0,0.0,0.031,0.02,20190102,5001


### kit dimension

In [65]:
kit_dim = tap_water_data.copy()

In [66]:
kit_dim.columns

Index(['kit_id', 'borough', 'zipcode', 'date_received', 'lead_first_draw_mg_l',
       'lead_1_2_minute_flush_mg_l', 'copper_first_draw_mg_l',
       'copper_1_2_minute_flush_mg_l', 'date_id', 'location_id'],
      dtype='object')

In [67]:
kit_dim = kit_dim[["kit_id"]]

In [68]:
kit_dim.count()

kit_id    11339
dtype: int64

In [69]:
# third, drop duplicate rows in dimension
kit_dim = kit_dim.drop_duplicates(subset = ['kit_id'], keep = 'first')
kit_dim = kit_dim.reset_index(drop = True)
kit_dim.head()

Unnamed: 0,kit_id
0,18256330
1,18256275
2,18255738
3,18256310
4,18256200


In [70]:
kit_dim.count()

kit_id    11339
dtype: int64

In [71]:
kit_dim.insert(0, 'kit_key', range(2000, 2000 + len(kit_dim)))
kit_dim.head()

Unnamed: 0,kit_key,kit_id
0,2000,18256330
1,2001,18256275
2,2002,18255738
3,2003,18256310
4,2004,18256200


In [72]:
tap_water_data = tap_water_data.merge(kit_dim[['kit_id', 'kit_key']],
                  left_on = 'kit_id',
                  right_on = 'kit_id',
                  how = 'left')

tap_water_data.head(2)

Unnamed: 0,kit_id,borough,zipcode,date_received,lead_first_draw_mg_l,lead_1_2_minute_flush_mg_l,copper_first_draw_mg_l,copper_1_2_minute_flush_mg_l,date_id,location_id,kit_key
0,18256330,BROOKLYN,11215,2019-01-02T00:00:00.000,0.0,0.0,0.243,0.032,20190102,5000,2000
1,18256275,NEW YORK,10024,2019-01-02T00:00:00.000,0.0,0.0,0.031,0.02,20190102,5001,2001


### Fact table for tap water

In [73]:
tap_water_fact_table = tap_water_data.copy()

In [74]:
tap_water_fact_table = tap_water_fact_table[["date_id",
                         "kit_key","location_id",
                         "lead_first_draw_mg_l",
                        "lead_1_2_minute_flush_mg_l",
                        "copper_first_draw_mg_l","copper_1_2_minute_flush_mg_l"]]

tap_water_fact_table.head()

Unnamed: 0,date_id,kit_key,location_id,lead_first_draw_mg_l,lead_1_2_minute_flush_mg_l,copper_first_draw_mg_l,copper_1_2_minute_flush_mg_l
0,20190102,2000,5000,0.0,0.0,0.243,0.032
1,20190102,2001,5001,0.0,0.0,0.031,0.02
2,20190102,2002,5002,0.0,0.0,0.166,0.067
3,20190102,2003,5003,0.0,0.0,0.275,0.073
4,20190102,2004,5004,0.001,0.0,0.314,0.191


## STEP 6

## Deliver Facts and Dimensions to Data Warehouse (BigQuery)

In [75]:
load_table_to_bigquery(df = location_dim,
                      table_name = "location_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=43e4953a-824f-4438-bf14-8c7eed40fbfa>


In [76]:
load_table_to_bigquery(df = kit_dim,
                      table_name = "kit_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=0b058463-88ff-4d81-97c9-1241a84c78db>


In [77]:
load_table_to_bigquery(df = tap_water_fact_table,
                      table_name = "tap_water_fact",
                      dataset_id = dataset_id)

Starting job LoadJob<project=cis9440-340719, location=US, id=e6d49187-e2df-44a7-b8f2-510beac35208>
