# ETL

This ETL (Extract, Transform, Load) file will help me analyze visa petition data from [Kaggle](https://www.kaggle.com/datasets/nsharan/h-1b-visa?resource=download), with the end goal of **finding lucrative petition demographics**. I will analyze my CSV data for a brief overview and then route it through SQL to perform a more robust analysis (see my .sql file for all queries). In the future, I will connect my SQL database to an API in order to query as my data changes over time (with new entries).

1. Load Data into SQL
2. EDA: Exploratory Data Analysis
3. Vizualization and API

### Import libraries and database

In [1]:
import pandas as pd
from getpass import getpass
import pymysql
import sqlalchemy as alch
import re

In [2]:
# This dataset includes over 3,000,000 petitions!
# use "nrows=100000" to read in only the first 100,000 visa requests. 
df = pd.read_csv("input/h1b_visas_2011-2016.csv")
df.drop("Unnamed: 0",axis=1, inplace=True)
df.sample(5)

Unnamed: 0,CASE_STATUS,EMPLOYER_NAME,SOC_NAME,JOB_TITLE,FULL_TIME_POSITION,PREVAILING_WAGE,YEAR,WORKSITE,lon,lat
2938634,CERTIFIED,"CSC CONSULTING, INC.","Software Developers, Applications","TECHNICAL DEVELOPER, STAFF CONSULTANT",Y,70034.0,2011.0,"WALTHAM, MASSACHUSETTS",-71.235611,42.376485
642030,CERTIFIED,KEARNY COUNTY HOSPITAL,MEDICAL AND CLINICAL LABORATORY TECHNOLOGISTS,MEDICAL LABORATORY SCIENTIST,N,46633.6,2016.0,"LAKIN, KANSAS",,
2928561,DENIED,"EMPYREAN CAPITAL PARTNERS, LP","Software Developers, Systems Software",SENIOR SOFTWARE ENGINEER,Y,120869.0,2011.0,"LOS ANGELES, CALIFORNIA",-118.243685,34.052234
1908789,CERTIFIED,DELOITTE CONSULTING LLP,"Software Developers, Systems Software",CONSULTANT,Y,69742.0,2013.0,"IRVINE, CALIFORNIA",-117.794694,33.683947
1820919,CERTIFIED,"CIRCULAR EDGE, LLC",Computer Programmers,PROGRAMMER ANALYST,Y,53914.0,2013.0,"SOMERSET, NEW JERSEY",-74.488487,40.497604


### Understand and format my database before importing to SQL.

In [3]:
print(f"CASE_STATUS: \n {df['CASE_STATUS'].unique()} \n\nSOC_NAME: \n {df['SOC_NAME'].unique()} \n\nFULL_TIME_POSITION: \n {df['FULL_TIME_POSITION'].unique()} \n\nPREVAILING_WAGE: \n {df['PREVAILING_WAGE'].unique()} \n\nYEAR: \n {df['YEAR'].unique()} \n\nWORKSITE: \n {df['WORKSITE'].unique()}")

CASE_STATUS: 
 ['CERTIFIED-WITHDRAWN' 'WITHDRAWN' 'CERTIFIED' 'DENIED' 'REJECTED'
 'INVALIDATED' 'PENDING QUALITY AND COMPLIANCE REVIEW - UNASSIGNED' nan] 

SOC_NAME: 
 ['BIOCHEMISTS AND BIOPHYSICISTS' 'CHIEF EXECUTIVES' 'FINANCIAL MANAGERS'
 ... 'Tree Trimmers and Pruners'
 'Excavating and Loading Machine and Dragline Operat'
 'Earth Drillers, Except Oil and Gas'] 

FULL_TIME_POSITION: 
 ['N' 'Y' nan] 

PREVAILING_WAGE: 
 [3.6067000e+04 2.4267400e+05 1.9306600e+05 ... 3.3621300e+05 1.3000080e+05
 1.3701792e+08] 

YEAR: 
 [2016. 2015. 2014. 2013. 2012. 2011.   nan] 

WORKSITE: 
 ['ANN ARBOR, MICHIGAN' 'PLANO, TEXAS' 'JERSEY CITY, NEW JERSEY' ...
 'CLINTON, NEW JERSEY' 'OWINGS MILL, MARYLAND' 'ALTANTA, GEORGIA']


In [4]:
print(f"UNIQUE FIELDS: \n {df['SOC_NAME'].nunique()}\n\nUNIQUE FIELDS w/ POP > 100: \n [SEE MySQL QUERY]\n\nFIELD POPULATIONS: \n {df['SOC_NAME'].value_counts()}")

UNIQUE FIELDS: 
 2132

UNIQUE FIELDS w/ POP > 100: 
 [SEE MySQL QUERY]

FIELD POPULATIONS: 
 Computer Systems Analysts                          291170
Computer Programmers                               226574
SOFTWARE DEVELOPERS, APPLICATIONS                  221783
COMPUTER SYSTEMS ANALYSTS                          215353
Software Developers, Applications                  192933
                                                    ...  
ELEMENTARY SCHOOL TEACHERS, EXCEPT SPECIAL EDU          1
27-3031                                                 1
HEALTH PROFESSIONALS AND TECHNICIANS, ALL OTHER         1
15-1132                                                 1
Earth Drillers, Except Oil and Gas                      1
Name: SOC_NAME, Length: 2132, dtype: int64


### Split WORKSITE into CITY and STATE for further analysis

In [5]:
df.loc[0]

CASE_STATUS                     CERTIFIED-WITHDRAWN
EMPLOYER_NAME                UNIVERSITY OF MICHIGAN
SOC_NAME              BIOCHEMISTS AND BIOPHYSICISTS
JOB_TITLE              POSTDOCTORAL RESEARCH FELLOW
FULL_TIME_POSITION                                N
PREVAILING_WAGE                             36067.0
YEAR                                         2016.0
WORKSITE                        ANN ARBOR, MICHIGAN
lon                                      -83.743038
lat                                       42.280826
Name: 0, dtype: object

In [6]:
# add column CITY
df[['CITY','STATE']] = df['WORKSITE'].str.split(", ", expand=True)

In [7]:
# check if function worked
print(f"CITY: {df['CITY'][0]}\n\nSTATE: {df['STATE'][0]}")

CITY: ANN ARBOR

STATE: MICHIGAN


### Turn all double-quotes into single-quotes for SQL parsing

In [8]:
#for row in df['EMPLOYER_NAME'].loc[23109:23115]:
    #row.strip('\"')
    #print(row)
#    row = row.replace('"','')
#df['EMPLOYER_NAME'] = df['EMPLOYER_NAME'].replace('"','')
#df.replace(to_replace=r'"', value='', regex=True)
#print(f"POST-DROP: {df.loc[23111]['EMPLOYER_NAME']}")

### Check for and remove missing values
Remove missing values if there are not too many, otherwise reformat the column in question.

In [9]:
# analyze missing values (to drop)
print('MISSING VALUES, PRE-DROP:\n\n')
for (columnName, columnData) in df.items():
    print(f"Missing values in {columnName}: {columnData.isnull().sum()} out of {len(df)}")

#df['FULL_TIME_POSITION'] = df['FULL_TIME_POSITION'].dropna(inplace=True)
df.dropna(subset=['EMPLOYER_NAME','JOB_TITLE','FULL_TIME_POSITION','PREVAILING_WAGE'], inplace=True)
df.fillna({'SOC_NAME': 'NA'})

# analyze missing values (after dropping)
print('MISSING VALUES, POST-DROP:\n\n')
for (columnName, columnData) in df.items():
    print(f"Missing values IN {columnName}: {columnData.isnull().sum()}")

MISSING VALUES, PRE-DROP:


Missing values in CASE_STATUS: 13 out of 3002458
Missing values in EMPLOYER_NAME: 59 out of 3002458
Missing values in SOC_NAME: 17734 out of 3002458
Missing values in JOB_TITLE: 43 out of 3002458
Missing values in FULL_TIME_POSITION: 15 out of 3002458
Missing values in PREVAILING_WAGE: 85 out of 3002458
Missing values in YEAR: 13 out of 3002458
Missing values in WORKSITE: 0 out of 3002458
Missing values in lon: 107242 out of 3002458
Missing values in lat: 107242 out of 3002458
Missing values in CITY: 0 out of 3002458
Missing values in STATE: 0 out of 3002458
MISSING VALUES, POST-DROP:


Missing values IN CASE_STATUS: 0
Missing values IN EMPLOYER_NAME: 0
Missing values IN SOC_NAME: 17702
Missing values IN JOB_TITLE: 0
Missing values IN FULL_TIME_POSITION: 0
Missing values IN PREVAILING_WAGE: 0
Missing values IN YEAR: 0
Missing values IN WORKSITE: 0
Missing values IN lon: 107220
Missing values IN lat: 107220
Missing values IN CITY: 0
Missing values IN STATE: 0

### Format FULL_TIME_POSITION into boolean values

In [10]:
df['FULL_TIME_POSITION'] = df['FULL_TIME_POSITION'].map({'Y': 1, 'N': 0}) # changing from Y to 1 (not 'True') for SQL syntax
df['FULL_TIME_POSITION'] = df['FULL_TIME_POSITION'].astype('int')
type(df['FULL_TIME_POSITION'][0])

numpy.int64

In [11]:
for val in df['FULL_TIME_POSITION']:
    if val == 1:
        pass
    elif val == 0:
        pass
    else:
        print(val)

### Length
Getting a general idea of column length for SQL database creation. I will make the max column size a little bit larger than the greatest current column for potential future entries.

In [12]:
print(f"FIELD: {len('BUSINESS OPERATIONS SPECIALISTS, ALL OTHER')}\n\nEMPLOYER: {len('THE CHICAGO ATHENAEUM: CENTER FOR ARCHITECTURE, DESIGN & URBAN STUDIES')}\n\nCITY: {len('VAN BUREN CHARTER TOWNSHIP')}\n\nSTATE: {len('DISTRICT OF COLUMBIA')}")


FIELD: 42

EMPLOYER: 70

CITY: 26

STATE: 20


### Potential discoveries:
- most lucrative field, state.
- growth in visa requests y/y.
- check if full-time visas are more lucrative.

# 1. Load Data into SQL

### Create SQL schema
This step is done in MySQL Workbench:
1. Build the database in MySQL Workbench.
2. Build the tables with efficient datatypes.
3. Populate the tables through Python (see below).

Alternatively, I could have drawn an EER diagram and reverse engineered that diagram to get an automatically generated code for the table.

### MySQL reverse engineering code

In [13]:
"""
-- setup the database
CREATE DATABASE us_immigration;
USE us_immigration;

-- create first table
DROP TABLE IF EXISTS visa;
CREATE TABLE visa (
    -- add an idex when new tables are added to the database (unnecessary now)
    status ENUM('CERTIFIED-WITHDRAWN', 'WITHDRAWN', 'CERTIFIED', 'DENIED',
        'REJECTED', 'INVALIDATED', 'PENDING REVIEW - UNASSIGNED', 'nan'),
    field VARCHAR(64),
    job VARCHAR(64),
    employer VARCHAR(64),
    full_time bool,
    wage INT,
    year YEAR,
    city VARCHAR(32),
    state VARCHAR(16)
);
""";

### Connect to SQL (using SQLalchemy)

In [14]:
# establish connection
password = getpass("Insert your password here: ")
dbName = "us_immigration"
connectionData = f"mysql+pymysql://root:{password}@localhost/{dbName}"
engine = alch.create_engine(connectionData)

Insert your password here: ········


In [15]:
# this function should return an empty string, given that data has not been entered into the table yet.
list(engine.execute("SELECT * FROM visa"))

[]

### Populate SQL

In [16]:
def check (table, string):
    ''' Ensure duplicate data is not inserted into SQL using defensive programming '''
    
    if table == "visa":
        query = list(engine.execute(f"SELECT name FROM visa WHERE name = '{string}';"))
        if len(query) > 0:
            return True
        else:
            return False
        
    if table == "demographic":
        query = list(engine.execute(f"SELECT name FROM demographic WHERE name = '{string}';"))
        if len(query) > 0:
            return True
        else:
            return False

In [17]:
# Test the check function.
#check("visa", "John")

In [18]:
def insertVisa (status, field, job, employer, full_time, wage, year, city, state):
    ''' Insert data into SQL table VISA '''
    
    """ NOT USING UNIQUE CHECK SINCE DATA DOES NOT HAVE UNIQUE IDS (LIKE 'NAME')
    if check("visa", string):
        return "It already exists"
    else:
    """
    
    #txtSQL = 'INSERT INTO visa (status, field, job, employer, full_time, wage, year, city, state) VALUES ('+status+', '+field+', '+job+', '+employer+', '+full_time+', '+wage+', '+year+', '+city+', '+state+');'
    #txtSQL = f'INSERT INTO visa (status, field, job, employer, full_time, wage, year, city, state) VALUES ("{status}", "{field}", "{job}", "{employer}", "{full_time}", "{wage}", "{year}", "{city}", "{state}");'
    engine.execute(f'INSERT INTO visa (status, field, job, employer, full_time, wage, year, city, state) VALUES ("{status}", "{field}", "{job}", "{employer}", "{full_time}", "{wage}", "{year}", "{city}", "{state}");')

In [19]:
# Test the insert function.
#insertVisa ("test")
#check("visa", "John")

In [None]:
# Insert all data into SQL: 3 million rows; CASE_STATUS, SOC_NAME, PREVAILING_WAGE, YEAR.
i=0 # error counter
for index, row in df.iterrows():
    if index%30000==0:
        print(f'{index/3000000}%  ')
    try:
        insertVisa(row["CASE_STATUS"], row["SOC_NAME"], row["JOB_TITLE"], row["EMPLOYER_NAME"], row["FULL_TIME_POSITION"], row["PREVAILING_WAGE"], row["YEAR"], row["CITY"], row["STATE"])
    except:
        # debug code: print(f'Error at index {index}\n{row}\n\n')
        i+=1
print(f'{i} rows have been dropped while inserting data into SQL, out of {len(df)}')

### Import statistics:
- duration: 45 minutes
- errors: 169 / 3,002,311 (0.005%)

To resolve over 90% of the errors in the import I will delve deeper into 'defensive programming' to remove any possible injection errors.

### Export reformatted CSV

In [23]:
df.to_csv("output/h1b_visas_2011-2016.csv")

# 2. EDA: Exploratory Data Analysis
See MySQL Workbench (or screenshots attached to readme file) for full query list.

### Most Lucrative Field
1. what is the aggregate WAGE for each FIELD?
2. Group petitions with similar fields.
3. Filter for only approved.

In [24]:
'''
TWO ALTERNATIVE QUERIES
I chose to use the first query, based on population size, as the 
binary decision to apply for a visa is made at a much lower wage 
threshold.

-- QUERY 1: Most lucrative field, by population (preferred)
SELECT SUM(wage) AS total_wage, COUNT(field) AS num_in_field, field
FROM visa
WHERE status = 'CERTIFIED-WITHDRAWN' OR status = 'CERTIFIED'
GROUP BY field
HAVING COUNT(field) > 2 -- filter out misc fields (usually mistypes)
ORDER BY num_in_field DESC;

-- QUERY 2: Most lucrative field, by total wage
SELECT SUM(wage) AS total_wage, COUNT(field) AS num_in_field, field
FROM visa
WHERE status = 'CERTIFIED-WITHDRAWN' OR status = 'CERTIFIED'
GROUP BY field
HAVING COUNT(field) > 2 -- filter out misc fields (usually mistypes)
ORDER BY total_wage DESC;
'''

# query-to-csv
sql_query = pd.read_sql_query('''
                                SELECT SUM(wage) AS total_wage, COUNT(field) AS num_in_field, field
                                FROM visa
                                WHERE status = 'CERTIFIED-WITHDRAWN' OR status = 'CERTIFIED'
                                GROUP BY field
                                HAVING COUNT(field) > 2 -- filter out misc fields (usually mistypes)
                                ORDER BY num_in_field DESC;
                              ''',connectionData)

query_df = pd.DataFrame(sql_query) # query to dataframe
query_df.to_csv (r'output/field_pop.csv', index = False) # dataframe to csv

### Most Lucrative State
1. what is the aggregate WAGE for each STATE? 
2. Group petitions with similar state names.
3. Filter for only approved.

In [25]:
sql_query = pd.read_sql_query('''
                                -- Most lucrative state, by population
                                SELECT SUM(wage) AS total_wage, COUNT(state) AS num_in_state, state
                                FROM visa
                                WHERE status = 'CERTIFIED-WITHDRAWN' OR status = 'CERTIFIED'
                                GROUP BY state
                                ORDER BY num_in_state DESC;
                              ''',connectionData)

query_df = pd.DataFrame(sql_query) # query to dataframe
query_df.to_csv (r'output/state_pop.csv', index = False) # dataframe to csv

### Visa Request Growth
1. How have visa petitions grown or shrunk over the years? 
2. Group petitions by year.
3. Filter once for only approved, and do not filter – both are relevant.

In [26]:
sql_query = pd.read_sql_query('''
                                SELECT year FROM visa GROUP BY year;
                              ''',connectionData)

query_df = pd.DataFrame(sql_query) # query to dataframe
query_df.to_csv (r'output/petition_pop.csv', index = False) # dataframe to csv

### Visa Popularity Between Full and Part-Time
1. Are more petitions made for Full-Time positions? 
2. Group petitions by FULL_TIME status.
3. Filter for only approved.

In [27]:
sql_query = pd.read_sql_query('''
                                SELECT COUNT(full_time) as total_petitions, CASE 
                                    WHEN full_time = 1 
                                    THEN 'Y' ELSE 'N' 
                                END AS full_time
                                FROM visa
                                GROUP BY full_time;
                              ''',connectionData)

query_df = pd.DataFrame(sql_query) # query to dataframe
query_df.to_csv (r'output/employment_type_pop.csv', index = False) # dataframe to csv

### Merge all CSVs into one master file, for ease of access

In terminal, I used the command: 
_cat *.csv >combined.csv_

## A full list of queries are included in the [.sql](https://github.com/VeniceHartwell/immigration_analysis/blob/main/us_immigration.sql) file. See the [readme](https://github.com/VeniceHartwell/immigration_analysis/blob/main/README.md) for an explanation of the queries used.

# 3. Vizualization and API

For quick readability, I plan to add visualization options in the future.

For quick accessibility, I plan to add flask API accessibility. This means my code can be run from the web, if necessary. I currently use Python and/or SQL to query for data.