# Building an ETL Pipeline

## Step 0: Install the required python packages

In [None]:
pip install -- upgrade sodapy

In [None]:
pip install -- upgrade pyarrow

In [None]:
pip install -- upgrade db -- dtypes

In [None]:
pip install -- upgrade google-cloud-bigquery

## Step 1: Setup your NYC Open Data variables

In [186]:
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

In [187]:
data_url = 'data.cityofnewyork.us'

In [188]:
data_set = 'kpav-sd4t'

In [189]:
app_token = 'XMuS5g30KznEWHWAwsUzavCyh'

In [190]:
nyc_open_data_job = Socrata(data_url,app_token,timeout = 200)
print(f"nyc open data job is:{nyc_open_data_job}")
print(f"nyc open data job data type is:{type(nyc_open_data_job)}")

nyc open data job is:<sodapy.socrata.Socrata object at 0x7febbde17650>
nyc open data job data type is:<class 'sodapy.socrata.Socrata'>


In [191]:
test_result = nyc_open_data_job.get(data_set,limit = 100)

In [192]:
test_result_pd = pd.DataFrame.from_records(test_result)

In [193]:
test_result_pd.head()

Unnamed: 0,job_id,agency,posting_type,number_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,...,additional_information,to_apply,residency_requirement,posting_date,posting_updated,process_date,preferred_skills,hours_shift,work_location_1,post_until
0,424339,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",PUBLIC HEALTH NURSE,Competitive-1,51011,03,Health,...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,2022-04-05T00:00:00.000,2022-04-12T00:00:00.000,,,,
1,379094,NYC EMPLOYEES RETIREMENT SYS,External,1,CERTIFIED IT DEVELOPER (APPLICATIONS),CERT IT DEVELOPER (APP),Competitive-1,13643,02,"Technology, Data & Innovation",...,,"TO APPLY FOR CONSIDERATION, PLEASE FORWARD A C...",New York City Residency is not required for th...,2019-01-07T00:00:00.000,2019-01-07T00:00:00.000,2022-04-12T00:00:00.000,Â·\tExperience with testing /deployment tools ...,,,
2,520417,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,EXECUTIVE AGENCY COUNSEL,Non-Competitive-5,95005,M2,Legal Affairs,...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,2022-03-28T00:00:00.000,2022-04-12T00:00:00.000,1. Thorough knowledge of tort substantive and...,,,
3,233549,NYC EMPLOYEES RETIREMENT SYS,External,1,"CERTIFIED IT ADMINISTRATOR (LAN/WAN), LEVEL 4",CERTIFIED IT ADMINISTRATOR (LA,Competitive-1,13652,04,Information Technology & Telecommunications,...,,Click the 'apply now' button to apply. Please...,New York City Residency is not required for th...,2016-03-01T00:00:00.000,2016-03-01T00:00:00.000,2022-04-12T00:00:00.000,"Minimum 5 years of experience planning, design...",,,
4,510256,HUMAN RIGHTS COMMISSION,External,5,Associate Human Rights Specialist,ASSOCIATE HUMAN RIGHTS SPECIAL,Competitive-1,55038,01,Constituent Services & Community Programs,...,,For City employees: Go to Employee Self-Servic...,New York City residency is generally required ...,2021-12-16T00:00:00.000,2021-12-16T00:00:00.000,2022-04-12T00:00:00.000,â¢\tStrong oral and written communication ski...,": DAY, 9-5; ON OCCASION, CANDIDATES MAY BE REQ...","22 Reade St, Ny",


## Step 2: Setup your Google BigQuery variables

In [194]:
key_path = r'/Users/qiyuliu/Downloads/cis-9440-340322-89438e181e3f.json'

In [195]:
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x7febd6fddc90>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


In [196]:
dataset_id = 'cis-9440-340322.etl_nycjobs_2022'   # PASTE THIS DATASET ID FROM ABOVE STEPS

dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: cis-9440-340322.etl_nycjobs_2022


## Step 3: Extract data

In [197]:
total_record_count = nyc_open_data_job.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count}")

total records in kpav-sd4t: [{'COUNT': '3773'}]


In [198]:
target_record_count = nyc_open_data_job.get(data_set, where = "posting_date > '2020-02-29'",select = "COUNT(*)")
print(f"target records in {data_set}: {target_record_count}")

target records in kpav-sd4t: [{'COUNT': '3576'}]


In [199]:
def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_job.get(data_set,
                                             where = "posting_date > '2020-02-29'",
                                             offset = start,
                                             limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

data = pull_data_in_chunks(target_record_count)

function took 2.4 seconds
the shape of your dataframe is: (3576, 29)


In [200]:
data.head(2)

Unnamed: 0,job_id,agency,posting_type,number_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,...,additional_information,to_apply,residency_requirement,posting_date,posting_updated,process_date,preferred_skills,hours_shift,work_location_1,post_until
0,424339,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",PUBLIC HEALTH NURSE,Competitive-1,51011,03,Health,...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,2022-04-05T00:00:00.000,2022-04-12T00:00:00.000,,,,
1,520417,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,EXECUTIVE AGENCY COUNSEL,Non-Competitive-5,95005,M2,Legal Affairs,...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,2022-03-28T00:00:00.000,2022-04-12T00:00:00.000,1. Thorough knowledge of tort substantive and...,,,


## Step 4: Data Profiling

In [201]:
data.columns

Index(['job_id', 'agency', 'posting_type', 'number_of_positions',
       'business_title', 'civil_service_title', 'title_classification',
       'title_code_no', 'level', 'job_category',
       'full_time_part_time_indicator', 'career_level', 'salary_range_from',
       'salary_range_to', 'salary_frequency', 'work_location',
       'division_work_unit', 'job_description', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'posting_updated', 'process_date', 'preferred_skills',
       'hours_shift', 'work_location_1', 'post_until'],
      dtype='object')

In [202]:
def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = (data[column].shape[0] - data[column].isna().sum()) - len(data[column].unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].shape[0] - data[column].isna().sum()
            info_dict["percent_null"] = round((data[column].isna().sum()) / (data[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    print(f"data profiling complete, shape of df:{data_profiling_df.shape}")
    return data_profiling_df

data_profiling_df = create_data_profiling_df(data)

data profiling complete, shape of df:(29, 7)


In [203]:
data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values,percent_null
0,job_id,object,1935,1641,0,3576,0.0
17,job_description,object,1840,1736,0,3576,0.0
25,preferred_skills,object,1486,1709,381,3195,0.107
4,business_title,object,1454,2122,0,3576,0.0
20,to_apply,object,1004,2572,0,3576,0.0
16,division_work_unit,object,822,2754,0,3576,0.0
19,additional_information,object,781,1460,1335,2241,0.373
13,salary_range_to,object,515,3061,0,3576,0.0
12,salary_range_from,object,386,3190,0,3576,0.0
18,minimum_qual_requirements,object,368,3183,25,3551,0.007


## Step 5: Data Cleansing

In [204]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3576 entries, 0 to 3575
Data columns (total 29 columns):
 #   Column                         Non-Null Count  Dtype 
---  ------                         --------------  ----- 
 0   job_id                         3576 non-null   object
 1   agency                         3576 non-null   object
 2   posting_type                   3576 non-null   object
 3   number_of_positions            3576 non-null   object
 4   business_title                 3576 non-null   object
 5   civil_service_title            3576 non-null   object
 6   title_classification           3576 non-null   object
 7   title_code_no                  3576 non-null   object
 8   level                          3576 non-null   object
 9   job_category                   3576 non-null   object
 10  full_time_part_time_indicator  3423 non-null   object
 11  career_level                   3576 non-null   object
 12  salary_range_from              3576 non-null   object
 13  sal

In [205]:
data.columns

Index(['job_id', 'agency', 'posting_type', 'number_of_positions',
       'business_title', 'civil_service_title', 'title_classification',
       'title_code_no', 'level', 'job_category',
       'full_time_part_time_indicator', 'career_level', 'salary_range_from',
       'salary_range_to', 'salary_frequency', 'work_location',
       'division_work_unit', 'job_description', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'posting_updated', 'process_date', 'preferred_skills',
       'hours_shift', 'work_location_1', 'post_until'],
      dtype='object')

In [206]:
drop_columns = ["job_id",
               "title_code_no",
               "civil_service_title",
               "title_classification",
               "level",
               "salary_frequency",
               "job_description",
               "hours_shift",
               "work_location_1",
               "posting_updated",
               "process_date",
               "post_until"]
for column in drop_columns:
    try:
        data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop{column}")
print(f"columns left in dataframe: {data.columns}")

columns left in dataframe: Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills'],
      dtype='object')


In [207]:
print(f"number of duplicate rows: {len(data[data.duplicated()])}")

number of duplicate rows: 98


In [208]:
data = data.drop_duplicates(keep = 'first')
print(f"number of rows after duplicates dropped: {len(data)}")

number of rows after duplicates dropped: 3478


## Step 4: Create Agency Dimension

In [209]:
agency_dim = data.copy()

In [210]:
agency_dim.columns

Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills'],
      dtype='object')

In [211]:
agency_dim = agency_dim[["agency",
                        "posting_type",
                        "division_work_unit",
                        "additional_information"]]

In [212]:
agency_dim.head()

Unnamed: 0,agency,posting_type,division_work_unit,additional_information
0,DEPT OF HEALTH/MENTAL HYGIENE,External,PHC Administration,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...
1,NYC HOUSING AUTHORITY,External,Law Dept-Litigation,1. Resume and cover letter must also include ...
2,HUMAN RIGHTS COMMISSION,External,Comm.Rel.Bureau-Central,
3,DEPARTMENT OF BUILDINGS,Internal,Dev Hub-Full Service,"If selected, candidates must provide a transcr..."
4,DEPARTMENT OF CORRECTION,External,Information Systems-Admin,


In [213]:
agency_dim = agency_dim.drop_duplicates(subset = ["agency"], keep = 'first')
agency_dim = agency_dim.reset_index(drop = True)
agency_dim.head()

Unnamed: 0,agency,posting_type,division_work_unit,additional_information
0,DEPT OF HEALTH/MENTAL HYGIENE,External,PHC Administration,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...
1,NYC HOUSING AUTHORITY,External,Law Dept-Litigation,1. Resume and cover letter must also include ...
2,HUMAN RIGHTS COMMISSION,External,Comm.Rel.Bureau-Central,
3,DEPARTMENT OF BUILDINGS,Internal,Dev Hub-Full Service,"If selected, candidates must provide a transcr..."
4,DEPARTMENT OF CORRECTION,External,Information Systems-Admin,


In [214]:
agency_dim.insert(0,'agency_id', range(10,10+len(agency_dim)))
agency_dim.head()

Unnamed: 0,agency_id,agency,posting_type,division_work_unit,additional_information
0,10,DEPT OF HEALTH/MENTAL HYGIENE,External,PHC Administration,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...
1,11,NYC HOUSING AUTHORITY,External,Law Dept-Litigation,1. Resume and cover letter must also include ...
2,12,HUMAN RIGHTS COMMISSION,External,Comm.Rel.Bureau-Central,
3,13,DEPARTMENT OF BUILDINGS,Internal,Dev Hub-Full Service,"If selected, candidates must provide a transcr..."
4,14,DEPARTMENT OF CORRECTION,External,Information Systems-Admin,


In [215]:
data = data.merge(agency_dim[["agency","agency_id"]],
                 left_on = "agency",
                 right_on = "agency",
                 how = "left")
data.head(2)

Unnamed: 0,agency,posting_type,number_of_positions,business_title,job_category,full_time_part_time_indicator,career_level,salary_range_from,salary_range_to,work_location,division_work_unit,minimum_qual_requirements,additional_information,to_apply,residency_requirement,posting_date,preferred_skills,agency_id
0,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",Health,F,Experienced (non-manager),84252,84252,NYC - All Boroughs,PHC Administration,1. A Bachelorâs of Science degree in Nursing...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,,10
1,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,Legal Affairs,F,Manager,105000,125000,Law-Tort Division,Law Dept-Litigation,Admission to the New York State Bar; and four ...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,1. Thorough knowledge of tort substantive and...,11


## Step 5: Create Job Category Dimension

In [216]:
job_category_dim = data.copy()

In [217]:
job_category_dim.columns

Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills', 'agency_id'],
      dtype='object')

In [218]:
job_category_dim = job_category_dim[["job_category",
                                    "career_level",
                                    "work_location"]]

In [219]:
job_category_dim.head()

Unnamed: 0,job_category,career_level,work_location
0,Health,Experienced (non-manager),NYC - All Boroughs
1,Legal Affairs,Manager,Law-Tort Division
2,Constituent Services & Community Programs,Experienced (non-manager),"22 Reade St, Ny"
3,"Engineering, Architecture, & Planning",Student,"80 Centre St., N.Y."
4,"Technology, Data & Innovation",Experienced (non-manager),75-20 Astoria Blvd


In [220]:
job_category_dim =job_category_dim.drop_duplicates(subset = ["job_category"], keep = 'first')
job_category_dim = job_category_dim.reset_index(drop = True)
job_category_dim.head()

Unnamed: 0,job_category,career_level,work_location
0,Health,Experienced (non-manager),NYC - All Boroughs
1,Legal Affairs,Manager,Law-Tort Division
2,Constituent Services & Community Programs,Experienced (non-manager),"22 Reade St, Ny"
3,"Engineering, Architecture, & Planning",Student,"80 Centre St., N.Y."
4,"Technology, Data & Innovation",Experienced (non-manager),75-20 Astoria Blvd


In [221]:
job_category_dim.insert(0,'job_category_id', range(100,100+len(job_category_dim)))
job_category_dim.head()

Unnamed: 0,job_category_id,job_category,career_level,work_location
0,100,Health,Experienced (non-manager),NYC - All Boroughs
1,101,Legal Affairs,Manager,Law-Tort Division
2,102,Constituent Services & Community Programs,Experienced (non-manager),"22 Reade St, Ny"
3,103,"Engineering, Architecture, & Planning",Student,"80 Centre St., N.Y."
4,104,"Technology, Data & Innovation",Experienced (non-manager),75-20 Astoria Blvd


In [222]:
data = data.merge(job_category_dim[['job_category', 'job_category_id']],
                  left_on = 'job_category',
                  right_on = 'job_category',
                  how = 'left')

data.head(2)

Unnamed: 0,agency,posting_type,number_of_positions,business_title,job_category,full_time_part_time_indicator,career_level,salary_range_from,salary_range_to,work_location,division_work_unit,minimum_qual_requirements,additional_information,to_apply,residency_requirement,posting_date,preferred_skills,agency_id,job_category_id
0,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",Health,F,Experienced (non-manager),84252,84252,NYC - All Boroughs,PHC Administration,1. A Bachelorâs of Science degree in Nursing...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,,10,100
1,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,Legal Affairs,F,Manager,105000,125000,Law-Tort Division,Law Dept-Litigation,Admission to the New York State Bar; and four ...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,1. Thorough knowledge of tort substantive and...,11,101


## Step 6: Create Requirement Dimension

In [223]:
requirement_dim = data.copy()

In [224]:
requirement_dim.columns

Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills', 'agency_id', 'job_category_id'],
      dtype='object')

In [225]:
requirement_dim = requirement_dim[["minimum_qual_requirements",
                                  "preferred_skills",
                                  "residency_requirement",
                                  "full_time_part_time_indicator"]]

In [226]:
requirement_dim =requirement_dim.drop_duplicates(subset = ["minimum_qual_requirements"], keep = 'first')
requirement_dim = requirement_dim.reset_index(drop = True)
requirement_dim.head()

Unnamed: 0,minimum_qual_requirements,preferred_skills,residency_requirement,full_time_part_time_indicator
0,1. A Bachelorâs of Science degree in Nursing...,,New York City Residency is not required for th...,F
1,Admission to the New York State Bar; and four ...,1. Thorough knowledge of tort substantive and...,NYCHA has no residency requirements.,F
2,1. A baccalaureate degree from an accredited c...,â¢\tStrong oral and written communication ski...,New York City residency is generally required ...,F
3,As of June of the Program year the prospective...,"Majors: Electrical Engineering, or related fie...",This internship position reports in-person and...,F
4,"1. For Assignment Level I (only physical, bio...",â¢\tDemonstrable mastery of R (preferred) and...,New York City residency is generally required ...,F


In [227]:
requirement_dim.insert(0,"requirement_id", range(50,50+len(requirement_dim)))
requirement_dim.head()

Unnamed: 0,requirement_id,minimum_qual_requirements,preferred_skills,residency_requirement,full_time_part_time_indicator
0,50,1. A Bachelorâs of Science degree in Nursing...,,New York City Residency is not required for th...,F
1,51,Admission to the New York State Bar; and four ...,1. Thorough knowledge of tort substantive and...,NYCHA has no residency requirements.,F
2,52,1. A baccalaureate degree from an accredited c...,â¢\tStrong oral and written communication ski...,New York City residency is generally required ...,F
3,53,As of June of the Program year the prospective...,"Majors: Electrical Engineering, or related fie...",This internship position reports in-person and...,F
4,54,"1. For Assignment Level I (only physical, bio...",â¢\tDemonstrable mastery of R (preferred) and...,New York City residency is generally required ...,F


In [228]:
data = data.merge(requirement_dim[['minimum_qual_requirements', 'requirement_id']],
                  left_on = 'minimum_qual_requirements',
                  right_on = 'minimum_qual_requirements',
                  how = 'left')

data.head(2)

Unnamed: 0,agency,posting_type,number_of_positions,business_title,job_category,full_time_part_time_indicator,career_level,salary_range_from,salary_range_to,work_location,division_work_unit,minimum_qual_requirements,additional_information,to_apply,residency_requirement,posting_date,preferred_skills,agency_id,job_category_id,requirement_id
0,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",Health,F,Experienced (non-manager),84252,84252,NYC - All Boroughs,PHC Administration,1. A Bachelorâs of Science degree in Nursing...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,,10,100,50
1,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,Legal Affairs,F,Manager,105000,125000,Law-Tort Division,Law Dept-Litigation,Admission to the New York State Bar; and four ...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,1. Thorough knowledge of tort substantive and...,11,101,51


## Step 7: Create Application Dimension

In [229]:
application_dim = data.copy()

In [230]:
application_dim.columns

Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills', 'agency_id', 'job_category_id',
       'requirement_id'],
      dtype='object')

In [231]:
application_dim = application_dim[["to_apply",
                                  "business_title"]]

In [232]:
application_dim =application_dim.drop_duplicates(subset = ["to_apply"], keep = 'first')
application_dim = application_dim.reset_index(drop = True)
application_dim.head()

Unnamed: 0,to_apply,business_title
0,Apply online with a cover letter to https://a1...,"Triage Nurse, Bureau of Public Health Clinics"
1,Click the Apply Now button.,Chief of Settlement
2,For City employees: Go to Employee Self-Servic...,Associate Human Rights Specialist
3,For Non-City/External Candidates: Visit the Ex...,Electrical Plan Examination Intern
4,For City employees: Go to Employee Self-Servic...,Director of Data Analytics & Research


In [233]:
application_dim.insert(0,"application_id", range(1000,1000+len(application_dim)))
application_dim.head()

Unnamed: 0,application_id,to_apply,business_title
0,1000,Apply online with a cover letter to https://a1...,"Triage Nurse, Bureau of Public Health Clinics"
1,1001,Click the Apply Now button.,Chief of Settlement
2,1002,For City employees: Go to Employee Self-Servic...,Associate Human Rights Specialist
3,1003,For Non-City/External Candidates: Visit the Ex...,Electrical Plan Examination Intern
4,1004,For City employees: Go to Employee Self-Servic...,Director of Data Analytics & Research


In [234]:
data = data.merge(application_dim[['to_apply', 'application_id']],
                  left_on = 'to_apply',
                  right_on = 'to_apply',
                  how = 'left')

data.head(2)

Unnamed: 0,agency,posting_type,number_of_positions,business_title,job_category,full_time_part_time_indicator,career_level,salary_range_from,salary_range_to,work_location,...,minimum_qual_requirements,additional_information,to_apply,residency_requirement,posting_date,preferred_skills,agency_id,job_category_id,requirement_id,application_id
0,DEPT OF HEALTH/MENTAL HYGIENE,External,1,"Triage Nurse, Bureau of Public Health Clinics",Health,F,Experienced (non-manager),84252,84252,NYC - All Boroughs,...,1. A Bachelorâs of Science degree in Nursing...,**IMPORTANT NOTES TO ALL CANDIDATES: Please ...,Apply online with a cover letter to https://a1...,New York City Residency is not required for th...,2022-03-01T00:00:00.000,,10,100,50,1000
1,NYC HOUSING AUTHORITY,External,1,Chief of Settlement,Legal Affairs,F,Manager,105000,125000,Law-Tort Division,...,Admission to the New York State Bar; and four ...,1. Resume and cover letter must also include ...,Click the Apply Now button.,NYCHA has no residency requirements.,2022-03-28T00:00:00.000,1. Thorough knowledge of tort substantive and...,11,101,51,1001


## Step 8: Create Date Dimension

In [235]:
# first, create a BigQuery client to connect to BigQuery
from google.cloud import bigquery
from google.oauth2 import service_account

key_path = r'/Users/qiyuliu/Downloads/cis-9440-340322-89438e181e3f.json' # must edit to your credentials json file location
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials = credentials,
                         project = credentials.project_id)

In [236]:
print(client)

<google.cloud.bigquery.client.Client object at 0x7febbe02e0d0>


In [237]:
sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              EXTRACT(DAY FROM d) AS year_day,
              EXTRACT(WEEK FROM d) AS week,
              EXTRACT(WEEK FROM d) AS year_week,
              EXTRACT(MONTH FROM d) AS month,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              EXTRACT(YEAR FROM d) AS year,
              (CASE WHEN FORMAT_DATE('%A', d) IN ('Sunday', 'Saturday') THEN 0 ELSE 1 END) AS day_is_weekday,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2020-03-15', '2022-02-18', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = client.query(sql_query).to_dataframe()

# validate that >0 stories have been extracted and return dataframe
if len(date_dim) > 0:
    print("date dimension created")
else:
    print("date dimension FAILED")

date dimension created


In [238]:
date_dim.head()

Unnamed: 0,date_id,full_date,week_day,day_name,year_day,week,year_week,month,month_name,fiscal_qtr,year,day_is_weekday
0,20200315,2020-03-15,0,Sunday,15,11,11,3,March,1,2020,0
1,20200316,2020-03-16,1,Monday,16,11,11,3,March,1,2020,1
2,20200317,2020-03-17,2,Tuesday,17,11,11,3,March,1,2020,1
3,20200318,2020-03-18,3,Wednesday,18,11,11,3,March,1,2020,1
4,20200319,2020-03-19,4,Thursday,19,11,11,3,March,1,2020,1


In [239]:
data['date_id'] = data['posting_date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

## Step 9: Creating Job Vacancy Fact table

In [240]:
data.columns

Index(['agency', 'posting_type', 'number_of_positions', 'business_title',
       'job_category', 'full_time_part_time_indicator', 'career_level',
       'salary_range_from', 'salary_range_to', 'work_location',
       'division_work_unit', 'minimum_qual_requirements',
       'additional_information', 'to_apply', 'residency_requirement',
       'posting_date', 'preferred_skills', 'agency_id', 'job_category_id',
       'requirement_id', 'application_id', 'date_id'],
      dtype='object')

In [241]:
data = data[["date_id",
            "agency_id",
            "job_category_id",
            "requirement_id",
            "number_of_positions",
            "salary_range_from",
            "salary_range_to"]]

data.head()

Unnamed: 0,date_id,agency_id,job_category_id,requirement_id,number_of_positions,salary_range_from,salary_range_to
0,20220301,10,100,50,1,84252,84252.0
1,20220328,11,101,51,1,105000,125000.0
2,20211216,12,102,52,5,58449,67216.0
3,20220405,13,103,53,1,15,17.5
4,20210727,14,104,54,1,94283,108426.0


In [242]:
data["salary_range_from"] = pd.to_numeric(data["salary_range_from"])

In [243]:
data["salary_range_to"] = pd.to_numeric(data["salary_range_to"])

In [244]:
data["salary"] = data[["salary_range_from","salary_range_to"]].mean(axis = 1)

In [245]:
data.head()

Unnamed: 0,date_id,agency_id,job_category_id,requirement_id,number_of_positions,salary_range_from,salary_range_to,salary
0,20220301,10,100,50,1,84252.0,84252.0,84252.0
1,20220328,11,101,51,1,105000.0,125000.0,115000.0
2,20211216,12,102,52,5,58449.0,67216.0,62832.5
3,20220405,13,103,53,1,15.0,17.5,16.25
4,20210727,14,104,54,1,94283.0,108426.0,101354.5


In [246]:
data.drop(["salary_range_from","salary_range_to"],axis = 1)

Unnamed: 0,date_id,agency_id,job_category_id,requirement_id,number_of_positions,salary
0,20220301,10,100,50,1,84252.00
1,20220328,11,101,51,1,115000.00
2,20211216,12,102,52,5,62832.50
3,20220405,13,103,53,1,16.25
4,20210727,14,104,54,1,101354.50
...,...,...,...,...,...,...
3473,20220304,21,119,64,1,17.70
3474,20210202,44,104,110,1,102627.00
3475,20220331,16,103,56,1,80248.50
3476,20211213,45,104,71,1,107500.00


## Step 10: Setup your second NYC Open Data variables

In [247]:
data2_url = 'data.cityofnewyork.us'

In [248]:
data2_set = 'rc75-m7u3'

In [249]:
app_token = 'XMuS5g30KznEWHWAwsUzavCyh'

In [250]:
covid = Socrata(data2_url,app_token,timeout = 200)

In [251]:
test_results2 = covid.get(data2_set, limit = 100)

In [252]:
test_results2_df = pd.DataFrame.from_records(test_results2)

In [253]:
test_results2_df.head(2)

Unnamed: 0,date_of_interest,case_count,probable_case_count,hospitalized_count,death_count,death_count_probable,case_count_7day_avg,all_case_count_7day_avg,hosp_count_7day_avg,death_count_7day_avg,...,si_probable_case_count,si_hospitalized_count,si_death_count,si_probable_death_count,si_case_count_7day_avg,si_all_case_count_7day_avg,si_hospitalized_count_7day_avg,si_death_count_7day_avg,si_all_death_count_7day_avg,incomplete
0,2020-02-29T00:00:00.000,1,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2020-03-01T00:00:00.000,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [254]:
total_record_count2 = covid.get(data2_set, select = "COUNT(*)")
print(f"total records in {data2_set}: {total_record_count2}")

total records in rc75-m7u3: [{'COUNT': '773'}]


In [257]:
target_record_count2 = covid.get(data2_set,
                                 where = "date_of_interest > '2020-02-29'",
                                 select= "COUNT(*)")
print(f"target records in {data2_set}: {target_record_count2}")

target records in rc75-m7u3: [{'COUNT': '772'}]


In [258]:
import time
start_time = time.time()

start = 0             # start at 0
chunk_size = 2000     # fetch 2000 rows at a time
results2 = []          # empty out our result list
record_count2 = target_record_count2

while True:
    
    # fetch the set of records starting at 'start'
    results2.extend(covid.get(data2_set,
                              where = "date_of_interest > '2020-02-29'",
                              offset = start,
                              limit = chunk_size))
    
    # update the starting record number
    start = start + chunk_size
    
    # if we have fetched all of the records (we have reached record_count), exit loop
    if (start > int(record_count2[0]['COUNT'])):
        break
        
# convert the list into a pandas data frame
data2 = pd.DataFrame.from_records(results2)

end_time = time.time()
print(f"loop to {round(end_time - start_time, 1)} seconds")

data2.info()

loop to 0.6 seconds
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 772 entries, 0 to 771
Data columns (total 62 columns):
 #   Column                          Non-Null Count  Dtype 
---  ------                          --------------  ----- 
 0   date_of_interest                772 non-null    object
 1   case_count                      772 non-null    object
 2   probable_case_count             772 non-null    object
 3   hospitalized_count              772 non-null    object
 4   death_count                     772 non-null    object
 5   death_count_probable            772 non-null    object
 6   case_count_7day_avg             772 non-null    object
 7   all_case_count_7day_avg         772 non-null    object
 8   hosp_count_7day_avg             772 non-null    object
 9   death_count_7day_avg            772 non-null    object
 10  all_death_count_7day_avg        772 non-null    object
 11  bx_case_count                   772 non-null    object
 12  bx_probable_case_count        

In [259]:
data2.head(2)

Unnamed: 0,date_of_interest,case_count,probable_case_count,hospitalized_count,death_count,death_count_probable,case_count_7day_avg,all_case_count_7day_avg,hosp_count_7day_avg,death_count_7day_avg,...,si_probable_case_count,si_hospitalized_count,si_death_count,si_probable_death_count,si_case_count_7day_avg,si_all_case_count_7day_avg,si_hospitalized_count_7day_avg,si_death_count_7day_avg,si_all_death_count_7day_avg,incomplete
0,2020-03-01T00:00:00.000,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2020-03-02T00:00:00.000,0,0,2,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Step 11: Covid Data Profiling

In [260]:
data2.columns

Index(['date_of_interest', 'case_count', 'probable_case_count',
       'hospitalized_count', 'death_count', 'death_count_probable',
       'case_count_7day_avg', 'all_case_count_7day_avg', 'hosp_count_7day_avg',
       'death_count_7day_avg', 'all_death_count_7day_avg', 'bx_case_count',
       'bx_probable_case_count', 'bx_hospitalized_count', 'bx_death_count',
       'bx_probable_death_count', 'bx_case_count_7day_avg',
       'bx_all_case_count_7day_avg', 'bx_hospitalized_count_7day_avg',
       'bx_death_count_7day_avg', 'bx_all_death_count_7day_avg',
       'bk_case_count', 'bk_probable_case_count', 'bk_hospitalized_count',
       'bk_death_count', 'bk_probable_death_count', 'bk_case_count_7day_avg',
       'bk_all_case_count_7day_avg', 'bk_hospitalized_count_7day_avg',
       'bk_death_count_7day_avg', 'bk_all_death_count_7day_avg',
       'mn_case_count', 'mn_probable_case_count', 'mn_hospitalized_count',
       'mn_death_count', 'mn_probable_death_count', 'mn_case_count_7day_avg'

In [261]:
data2_profiling_df = pd.DataFrame(columns = ["column_name",
                                            "column_type",
                                            "unique_values",
                                            "duplicate_values",
                                            "null_values",
                                            "non_null_values",
                                            "percent_null"])

In [262]:
for column in data2.columns:
    
    info_dict = {}
    
    try:
        info_dict["column_name"] = column
        info_dict["column_type"] = data2[column].dtypes
        info_dict["unique_values"] = len(data2[column].unique())
        info_dict["duplicate_values"] = (data2[column].shape[0] - data2[column].isna().sum()) - len(data2[column].unique())
        info_dict["null_values"] = data2[column].isna().sum()
        info_dict["non_null_values"] = data2[column].shape[0] - data2[column].isna().sum()
        info_dict["percent_null"] = round((data2[column].isna().sum()) / (data2[column].shape[0]), 3)
        
    except:
        print(f"unable to read column: {column}")
    
    data2_profiling_df = data2_profiling_df.append(info_dict, ignore_index=True)
    
data2_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                              ascending = [False, False],
                              inplace=True)

In [263]:
data2_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values,percent_null
0,date_of_interest,object,772,0,0,772,0.0
1,case_count,object,684,88,0,772,0.0
7,all_case_count_7day_avg,object,673,99,0,772,0.0
6,case_count_7day_avg,object,657,115,0,772,0.0
27,bk_all_case_count_7day_avg,object,567,205,0,772,0.0
...,...,...,...,...,...,...,...
54,si_death_count,object,28,744,0,772,0.0
35,mn_probable_death_count,object,25,747,0,772,0.0
59,si_death_count_7day_avg,object,24,748,0,772,0.0
55,si_probable_death_count,object,10,762,0,772,0.0


In [264]:
drop_columns2 = ['probable_case_count',
                'death_count_probable',
                'case_count_7day_avg', 
                'all_case_count_7day_avg', 
                'hosp_count_7day_avg',
                'death_count_7day_avg', 
                'all_death_count_7day_avg', 
                'bx_case_count',
                'bx_probable_case_count', 
                'bx_hospitalized_count', 
                'bx_death_count',
                'bx_probable_death_count', 
                'bx_case_count_7day_avg',
                'bx_all_case_count_7day_avg', 
                'bx_hospitalized_count_7day_avg',
                'bx_death_count_7day_avg', 
                'bx_all_death_count_7day_avg',
                'bk_case_count', 
                'bk_probable_case_count', 
                'bk_hospitalized_count',
                'bk_death_count', 
                'bk_probable_death_count', 
                'bk_case_count_7day_avg',
                'bk_all_case_count_7day_avg', 
                'bk_hospitalized_count_7day_avg',
                'bk_death_count_7day_avg', 
                'bk_all_death_count_7day_avg',
                'mn_case_count', 
                'mn_probable_case_count', 
                'mn_hospitalized_count',
                'mn_death_count', 
                'mn_probable_death_count', 
                'mn_case_count_7day_avg',
                'mn_all_case_count_7day_avg', 
                'mn_hospitalized_count_7day_avg',
                'mn_death_count_7day_avg', 
                'mn_all_death_count_7day_avg',
                'qn_case_count', 
                'qn_probable_case_count', 
                'qn_hospitalized_count',
                'qn_death_count', 
                'qn_probable_death_count',
                'qn_case_count_7day_avg',
                'qn_all_case_count_7day_avg', 
                'qn_hospitalized_count_7day_avg',
                'qn_death_count_7day_avg', 
                'qn_all_death_count_7day_avg',
                'si_case_count', 
                'si_probable_case_count', 
                'si_hospitalized_count',
                'si_death_count', 
                'si_probable_death_count', 
                'si_case_count_7day_avg',
                'si_all_case_count_7day_avg', 
                'si_hospitalized_count_7day_avg',
                'si_death_count_7day_avg', 
                'si_all_death_count_7day_avg', 
                'incomplete']

for column in drop_columns2:
    try:
        data2.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

data2.columns

Index(['date_of_interest', 'case_count', 'hospitalized_count', 'death_count'], dtype='object')

In [265]:
data2[data2.duplicated()]
print(f"number of duplicate rows: {len(data2[data2.duplicated()])}")

number of duplicate rows: 0


## Step 12: Creating Covid Date dimension

In [266]:
date2_dim = client.query(sql_query).to_dataframe()

# validate that >0 stories have been extracted and return dataframe
if len(date2_dim) > 0:
    print("date dimension created")
else:
    print("date dimension FAILED")

date dimension created


In [267]:
date2_dim.head()

Unnamed: 0,date_id,full_date,week_day,day_name,year_day,week,year_week,month,month_name,fiscal_qtr,year,day_is_weekday
0,20200315,2020-03-15,0,Sunday,15,11,11,3,March,1,2020,0
1,20200316,2020-03-16,1,Monday,16,11,11,3,March,1,2020,1
2,20200317,2020-03-17,2,Tuesday,17,11,11,3,March,1,2020,1
3,20200318,2020-03-18,3,Wednesday,18,11,11,3,March,1,2020,1
4,20200319,2020-03-19,4,Thursday,19,11,11,3,March,1,2020,1


In [268]:
data2['date_id'] = data2['date_of_interest'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

## Step 13: Creating Covid Fact(s)

In [269]:
data2.columns

Index(['date_of_interest', 'case_count', 'hospitalized_count', 'death_count',
       'date_id'],
      dtype='object')

In [270]:
data2 = data2[["date_id",
            "case_count",
            "hospitalized_count",
            "death_count"]]

data2.head()

Unnamed: 0,date_id,case_count,hospitalized_count,death_count
0,20200301,0,1,0
1,20200302,0,2,0
2,20200303,1,7,0
3,20200304,5,2,0
4,20200305,3,14,0


## Step 14: Deliver Job Fact and Dimensions to Data Warehouse (BigQuery)

In [271]:
def load_table_to_bigquery(df, table_name):
    
    dataset_id = 'cis-9440-340322.etl_nycjobs_2022'
    
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"
    
    upload_table_name = f"cis-9440-340322.etl_nycjobs_2022.{table_name}"
    
    load_job = client.load_table_from_dataframe(df,
                                               upload_table_name,
                                               job_config = job_config)
    
    print(f"starting job {load_job}")

In [272]:
load_table_to_bigquery(df = agency_dim,
                      table_name = "agency_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=6700cf43-6f5c-48f3-9d15-a4705bdb4cd2>


In [273]:
load_table_to_bigquery(df = requirement_dim,
                      table_name = "requirement_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=d20a8430-03b0-4ea2-b099-1b3e59a05a65>


In [276]:
load_table_to_bigquery(df = job_category_dim,
                      table_name = "job_category_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=e014e473-6707-4dc8-b1b9-82c404ddc036>


In [277]:
load_table_to_bigquery(df = application_dim,
                      table_name = "application_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=e0b3f967-5c40-4800-bc14-66bc78147d4b>


In [278]:
load_table_to_bigquery(df = date_dim,
                      table_name = "date_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=5f8068ae-c70c-497c-a912-4792fd3e3d97>


In [279]:
load_table_to_bigquery(df = data,
                      table_name = "job_vacancy_fact")

starting job LoadJob<project=cis-9440-340322, location=US, id=c4370025-2a20-4252-940f-a2e4157149b3>


In [280]:
def load_table_to_bigquery(df, table_name):
    
    dataset_id = 'cis-9440-340322.etl_covid_2022'
    
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"
    
    upload_table_name = f"cis-9440-340322.etl_covid_2022.{table_name}"
    
    load_job = client.load_table_from_dataframe(df,
                                               upload_table_name,
                                               job_config = job_config)
    
    print(f"starting job {load_job}")

In [281]:
load_table_to_bigquery(df = data2,
                      table_name = "Covid_fact")

starting job LoadJob<project=cis-9440-340322, location=US, id=da7187e9-c1bb-4524-8ae9-e8cc96ac4c8a>


In [282]:
load_table_to_bigquery(df = date2_dim,
                      table_name = "date_dim")

starting job LoadJob<project=cis-9440-340322, location=US, id=994d81bf-f68e-4245-9cc9-01543989419f>
