# Milestone 3

#### ****Since my partner's GCP coupon was expired and the coupon turned out it only applies to my project, I had to figure out how to reload the tables from the dataset from my partner's project.

In [1]:
import pandas as pd
import pandas_gbq
from google.cloud import bigquery
%reload_ext google.cloud.bigquery

In [7]:
!pip install apache-beam[gcp]



## 1. Identify all the tables from your refined datasets which contain data that needs to be cleansed. 

#### Occupation Table:
##### <b>Change employment_start_date and employment_end_date to "YYYY-MM-DD" format and change it to DATE data type. </b>
##### The following queries shows that there are some values that are not in "YYYY-MM-DD" format (such as YYYY.MM.DD and MM/DD/YYYY)

In [2]:
%%bigquery
SELECT employment_start_date 
FROM H_1B_refined.Occupation
WHERE employment_start_date LIKE "%.%"
LIMIT 5

Unnamed: 0,employment_start_date
0,2014.6.4
1,2014.5.15
2,2014.9.12
3,2014.8.1
4,2014.9.15


In [3]:
%%bigquery
SELECT employment_start_date 
FROM H_1B_refined.Occupation
WHERE employment_start_date LIKE "%/%"
LIMIT 5

Unnamed: 0,employment_start_date
0,10/14/2014
1,05/27/2015
2,04/12/2013
3,08/01/2014
4,02/20/2015


##### <b>Change 123456 format to 12-3456 format in soc_code</b>
##### Since Ownership table has codes whose length is 7, we are dropping all the soc_codes that are in 12-3456.00 format or 30, etc <br>(Below query shows that Ownership table only has codes with 7 length)

In [1]:
%%bigquery
SELECT occ_code 
FROM H_1B_refined.Ownership
WHERE length(occ_code) != 7
LIMIT 10

Unnamed: 0,occ_code


##### There are some soc_codes with weird formats such as 12-3456.00, 30, 12.3456.00, and Commputer System Analyst.

In [2]:
%%bigquery
SELECT soc_code 
FROM H_1B_refined.Occupation 
WHERE length(soc_code) != 7
LIMIT 5

Unnamed: 0,soc_code
0,15-1031.00
1,25-1032.00
2,25-2031.00
3,17-2131.00
4,25-4012.00


##### <b>In addition, there are duplicates</b>
##### Duplicate count: 636801 

In [12]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Occupation            
GROUP BY job_title, employer_name, employer_city, employment_start_date, employment_end_date, soc_code, soc_title, prevailing_wage_YR, pw_wage_level, pw_wage_source, pw_wage_source_year, pw_wage_source_other, worksite_city, worksite_country, worksite_state, worksite_postal_code      
HAVING count > 1);  

Unnamed: 0,f0_
0,636801


#### <b>Application Table:</b>
##### There are some duplicate rows
##### Duplicate count: 519528 

In [11]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Application            
GROUP BY CASE_NUMBER, CASE_STATUS, CASE_SUBMITTED, DECESION_DATE, VISA_CLASS, employer_name, employer_city      
HAVING count > 1);  

Unnamed: 0,f0_
0,519528


#### <b> Employer Table: </b>
##### There are some duplicate rows
##### Duplicate count: 136947 

In [10]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Employer            
GROUP BY employer_name, employer_address, employer_city, employer_state, employer_postal_code, employer_country, employer_province, h_1b_dependent, willful_violator      
HAVING count > 1);  

Unnamed: 0,f0_
0,136947


#### <b> Ownership Table: </b>
##### There are some duplicate rows
##### Duplicate count: 1350 

In [13]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Ownership            
GROUP BY occ_code, occ_title, ownership, naics_title, grp, tot_emp, emp_prse, h_mean, a_mean, mean_prse, a_pct10, a_pct25, a_median, a_pct75, a_pct90  
HAVING count > 1);  

Unnamed: 0,f0_
0,1350


## 2. Write a Beam pipeline that normalizes the data from the table and creates a new table with the cleansed data

In [10]:
%run occupation_beam.py

input start_date: 2017.7.28, output start_date: 2017-7-28
input end_date: 2020.7.27, output end_date: 2020-7-27
input start_date: 2017.9.15, output start_date: 2017-9-15
input end_date: 2020.9.14, output end_date: 2020-9-14
input start_date: 2017.8.1, output start_date: 2017-8-1
input end_date: 2019.7.31, output end_date: 2019-7-31
input start_date: 2017.1.1, output start_date: 2017-1-1
input end_date: 2020.1.1, output end_date: 2020-1-1
input start_date: 2016.12.21, output start_date: 2016-12-21
input end_date: 2019.12.21, output end_date: 2019-12-21
input start_date: 2017.4.15, output start_date: 2017-4-15
input end_date: 2020.4.14, output end_date: 2020-4-14
input start_date: 2017.9.28, output start_date: 2017-9-28
input end_date: 2020.9.27, output end_date: 2020-9-27
input start_date: 2017.7.26, output start_date: 2017-7-26
input end_date: 2020.7.25, output end_date: 2020-7-25
input start_date: 2016.4.27, output start_date: 2016-4-27
input end_date: 2019.4.26, output end_date: 2019

### The subsample above doesn't contain soc_code that needs to be transformed, so we tested out our soc_code transformation code below <br> Please note that the result below output and Occ_CodeTest table in BQ are just for the sake of showing my code is working. 

In [15]:
%run occupation_beamCodeTest.py

input soc_code: 112022, output soc_code: 11-2022
input soc_code: 111011, output soc_code: 11-1011
input soc_code: 132051, output soc_code: 13-2051
input soc_code: 132051, output soc_code: 13-2051
input soc_code: 132051, output soc_code: 13-2051
input soc_code: 132051, output soc_code: 13-2051
input soc_code: 132051, output soc_code: 13-2051
input soc_code: 131081, output soc_code: 13-1081
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 151131, output soc_code: 15-1131
input soc_code: 1511

### For Application, Employer, and Ownership table, we used 100 limit.

#### <b>Application</b>
#### Delete duplicate rows

In [12]:
%run application_beam.py

#### <b>Employer</b>
#### Delete duplicate rows

In [13]:
%run employer_beam.py

#### <b>Ownership</b>
#### Delete duplicate rows

In [14]:
%run ownership_beam.py

## 3. Verify that the BigQuery output tables from the previous step contain a valid primary key.
#### If the output tables are child tables, they should also have a foreign key.

#### <b>Occupation_Beam Table:</b>
##### Duplicate count: 0 

In [1]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Occupation_Beam           
GROUP BY job_title, employer_name, employer_city, employment_start_date, employment_end_date, soc_code, soc_title, prevailing_wage_YR, pw_wage_level, pw_wage_source, pw_wage_source_year, pw_wage_source_other, worksite_city, worksite_country, worksite_state, worksite_postal_code      
HAVING count > 1);  

Unnamed: 0,f0_
0,0


#### <b>Application_Beam Table:</b>
##### Duplicate count: 0

In [1]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Application_Beam            
GROUP BY CASE_NUMBER, CASE_STATUS, CASE_SUBMITTED, DECESION_DATE, VISA_CLASS, employer_name, employer_city      
HAVING count > 1);  

Unnamed: 0,f0_
0,0


#### <b> Employer_Beam Table: </b>
##### Duplicate count: 0 

In [2]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Employer_Beam            
GROUP BY employer_name, employer_address, employer_city, employer_state, employer_postal_code, employer_country, employer_province, h_1b_dependent, willful_violator      
HAVING count > 1);  

Unnamed: 0,f0_
0,0


#### <b> Ownership_Beam Table: </b>
##### Duplicate count: 0 

In [3]:
%%bigquery
SELECT COUNT(*) 
FROM
(SELECT  *, COUNT(*) AS count
FROM H_1B_refined.Ownership_Beam           
GROUP BY occ_code, occ_title, ownership, naics_title, grp, tot_emp, emp_prse, h_mean, a_mean, mean_prse, a_pct10, a_pct25, a_median, a_pct75, a_pct90  
HAVING count > 1);  

Unnamed: 0,f0_
0,0
