#### Import required packages

In [49]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
import re
import numpy as np

#### Create configs

In [50]:
conf = SparkConf().setMaster("local").setAppName("Project_BigData")
sc = SparkContext.getOrCreate(conf = conf)
sqlcon = SQLContext.getOrCreate(sc)

#### Some global variables

In [66]:
pat1 = re.compile(r'"([a-xA-Z0-9. ,]+), ([a-xA-Z0-9. ,]+)"')
pat2 = re.compile(r'"([,]+)"')
keep = ['CASE_STATUS','EMPLOYER_NAME','EMPLOYER_STATE',\
        'AGENT_REPRESENTING_EMPLOYER','JOB_TITLE','SOC_NAME','NAICS_CODE','TOTAL_WORKERS',\
        'NEW_EMPLOYMENT','CONTINUED_EMPLOYMENT','CHANGE_PREVIOUS_EMPLOYMENT',\
        'NEW_CONCURRENT_EMPLOYMENT','CHANGE_EMPLOYER','AMENDED_PETITION','FULL_TIME_POSITION',\
        'PREVAILING_WAGE','H1B_DEPENDENT','SUPPORT_H1B','WORKSITE_STATE']
#We need to decide how to use the date 'CASE_SUBMITTED', add back to the list above once decided.
categorical = {"CASE_STATUS":"CASE_STATUS_C", "EMPLOYER_NAME":"EMPLOYER_NAME_C",\
              "EMPLOYER_STATE":"EMPLOYER_STATE_C","AGENT_REPRESENTING_EMPLOYER":"AGENT_REPRESENTING_EMPLOYER_C",\
              "JOB_TITLE":"JOB_TITLE_C","SOC_NAME":"SOC_NAME_C","NAICS_CODE":"NAICS_CODE_C",\
              "FULL_TIME_POSITION":"FULL_TIME_POSITION_C","H1B_DEPENDENT":"H1B_DEPENDENT_C",\
              "WORKSITE_STATE":"WORKSITE_STATE_C","SUPPORT_H1B":"SUPPORT_H1B_C"}
target = ["CASE_STATUS_C"]

In [52]:
def replaceCommaWithinQuotes(line):
    '''
    Remove commas within quotes with some words, recursion makes sure we reomve all such commas
    '''
    if len(pat1.findall(line)) == 0:
        return line
    line = pat1.sub( r'"\1 \2"', line )
    return replaceCommaWithinQuotes(line)

In [53]:
def yearlyWage(data_dict):
    '''
    Converts the prevailing wage to yearly.
    '''
    if data_dict['PW_UNIT_OF_PAY'] == 'Hour':
        data_dict['PREVAILING_WAGE'] = data_dict['PREVAILING_WAGE']*40*52 #40 hrs/week,52 weeks/yr
    if data_dict['PW_UNIT_OF_PAY'] == 'Week':
        data_dict['PREVAILING_WAGE'] = data_dict['PREVAILING_WAGE']*52    #52 weeks/yr
    if data_dict['PW_UNIT_OF_PAY'] == 'Bi-Weekly':
        data_dict['PREVAILING_WAGE'] = data_dict['PREVAILING_WAGE']*26    #52 weeks/yr,hence 26 bi-weeks 
    if data_dict['PW_UNIT_OF_PAY'] == 'Month':
        data_dict['PREVAILING_WAGE'] = data_dict['PREVAILING_WAGE']*12    #12 months/yr
    
    return data_dict

In [54]:
def createRow(line, headers):
    '''
    Returns a dictionary with headers as key and their values as the values
    '''
    data_dict = {}
    #Replace comma, within words between two quotes, with blank
    line = replaceCommaWithinQuotes(line) #This line may still have just comma within quotes-","
    #line = pat2.sub(r'""', line) - something weird is happening because of this line

    data_list = line.split(",")
    j = 0 #another index
    for i in range(len(headers)):
        if data_list[j] == '"': #In case we encounter a " we avoid it and move ahead.
            j = j+1
        if headers[i] == "":
            data_dict["S_NO"] = int(data_list[j])
        else:
            data_dict[headers[i]] = data_list[j]
        j = j+1
    #We make the prevailing wage yearly
    data_dict = yearlyWage(data_dict)
    return data_dict

In [55]:
def convertToCategorical(columns, dataframe):
    '''
    Converts each column in dataframe to its corresponding Cateforical column.
    columns is a dict representing the column as key and new colmn as value.
    '''
    for column in columns:
        indexer = StringIndexer(inputCol=column, outputCol=columns[column])
        dataframe = indexer.fit(dataframe).transform(dataframe)
    return dataframe


#### Read the data from HDFS

In [56]:
h1b_data = sc.textFile(\
           "hdfs://quickstart.cloudera:8020/user/cloudera/Project/H-1B_Disclosure_Data_FY17.csv")

#### Extract headers from the data.
 

In [57]:
headers_string = h1b_data.take(1)[0]
headers = headers_string.split(",")
h1b_data = h1b_data.filter(lambda x: x != headers_string)

#### Create the map and then dataframe from the map using sql context

In [58]:
h1_data_map = h1b_data.map(lambda x: Row(**createRow(x, headers)))
h1b_data_frame = sqlcon.createDataFrame(h1_data_map).cache()

#### Filter Data based on visa type, we need only H1B visas.

In [59]:
h1b_data_frame = h1b_data_frame.where(h1b_data_frame['VISA_CLASS'] == 'H-1B')

#### Take a small subset and convert to pandas just to show the data.

In [60]:
h1b_data_frame_0 = h1b_data_frame.where(h1b_data_frame['S_NO'] < 5 )
h1b_data_frame_0.toPandas()

Unnamed: 0,AGENT_ATTORNEY_CITY,AGENT_ATTORNEY_NAME,AGENT_ATTORNEY_STATE,AGENT_REPRESENTING_EMPLOYER,AMENDED_PETITION,CASE_NUMBER,CASE_STATUS,CASE_SUBMITTED,CHANGE_EMPLOYER,CHANGE_PREVIOUS_EMPLOYMENT,...,TOTAL_WORKERS,VISA_CLASS,WAGE_RATE_OF_PAY_FROM,WAGE_RATE_OF_PAY_TO,WAGE_UNIT_OF_PAY,WILLFUL_VIOLATOR,WORKSITE_CITY,WORKSITE_COUNTY,WORKSITE_POSTAL_CODE,WORKSITE_STATE
0,NEW YORK,"""ELLSWORTH CHAD""",NY,Y,0,I-200-16055-173457,CERTIFIED-WITHDRAWN,2016-02-24,0,0,...,1,H-1B,65811.0,67320.0,Year,N,RIVERWOODS,LAKE,60015,IL
1,NEW YORK,"""ELLSWORTH CHAD""",NY,Y,0,I-200-16064-557834,CERTIFIED-WITHDRAWN,2016-03-04,0,0,...,1,H-1B,53000.0,57200.0,Year,N,RIVERWOODS,LAKE,60015,IL
2,WASHINGTON,"""BURKE KAREN""",DC,Y,0,I-200-16063-996093,CERTIFIED-WITHDRAWN,2016-03-10,0,0,...,2,H-1B,77000.0,0.0,Year,N,WASHINGTON,,20007,DC
3,,"""",,N,0,I-200-16272-196340,WITHDRAWN,2016-09-28,0,0,...,1,H-1B,102000.0,0.0,Year,N,JERSEY CITY,HUDSON,7302,NJ
4,ATLANTA,"""SCOFIELD EILEEN""",GA,Y,0,I-200-15053-636744,CERTIFIED-WITHDRAWN,2015-02-22,1,0,...,1,H-1B,132500.0,0.0,Year,N,NEW YORK,NEW YORK,10036,NY


#### Convert required columns to categorical variables.

In [64]:
h1b_data_frame_1 = convertToCategorical(categorical, h1b_data_frame)

#### Keep columns that we are going to work with in a new dataframe, we remove the old columns that were converted to categorical since we need only the categorical version of those columns.

In [67]:
keeps = [x for x in keep if x not in categorical.keys() ] + categorical.values()
keeps = target + [x for x in keeps if x not in target]
h1b_data_frame_2 = h1b_data_frame_1[keeps]
h1b_data_frame_2.take(2)

[Row(CASE_STATUS_C=1.0, TOTAL_WORKERS=u'1', NEW_EMPLOYMENT=u'1', CONTINUED_EMPLOYMENT=u'0', CHANGE_PREVIOUS_EMPLOYMENT=u'0', NEW_CONCURRENT_EMPLOYMENT=u'0', CHANGE_EMPLOYER=u'0', AMENDED_PETITION=u'0', PREVAILING_WAGE=u'59197.0', WORKSITE_STATE_C=4.0, NAICS_CODE_C=35.0, SOC_NAME_C=1.0, H1B_DEPENDENT_C=0.0, AGENT_REPRESENTING_EMPLOYER_C=0.0, EMPLOYER_STATE_C=3.0, EMPLOYER_NAME_C=149.0, SUPPORT_H1B_C=0.0, FULL_TIME_POSITION_C=0.0, JOB_TITLE_C=57382.0),
 Row(CASE_STATUS_C=1.0, TOTAL_WORKERS=u'1', NEW_EMPLOYMENT=u'1', CONTINUED_EMPLOYMENT=u'0', CHANGE_PREVIOUS_EMPLOYMENT=u'0', NEW_CONCURRENT_EMPLOYMENT=u'0', CHANGE_EMPLOYER=u'0', AMENDED_PETITION=u'0', PREVAILING_WAGE=u'49800.0', WORKSITE_STATE_C=4.0, NAICS_CODE_C=35.0, SOC_NAME_C=10.0, H1B_DEPENDENT_C=0.0, AGENT_REPRESENTING_EMPLOYER_C=0.0, EMPLOYER_STATE_C=3.0, EMPLOYER_NAME_C=4198.0, SUPPORT_H1B_C=0.0, FULL_TIME_POSITION_C=0.0, JOB_TITLE_C=105.0)]

#### Convert the dataframe to rdd so that we can convert to LabeledPoints which  is required for  modelling of the data.

In [68]:
h1b_rdd = h1b_data_frame_2.rdd
h1b_labledPoints = h1b_rdd.map(lambda x : LabeledPoint(x[0],x[1:]))
h1b_labledPoints.take(2)

[LabeledPoint(1.0, [1.0,1.0,0.0,0.0,0.0,0.0,0.0,59197.0,4.0,35.0,1.0,0.0,0.0,3.0,149.0,0.0,0.0,57382.0]),
 LabeledPoint(1.0, [1.0,1.0,0.0,0.0,0.0,0.0,0.0,49800.0,4.0,35.0,10.0,0.0,0.0,3.0,4198.0,0.0,0.0,105.0])]