In [103]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf

spark = SparkSession.builder.appName("Fake-job-prediction").getOrCreate()

In [104]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}
</style>

In [105]:
df = spark.read.format("csv").option("header", "true").option("escape","\"").option("quote","\"").load("fake_job_postings.csv", sep=",")

df.show(1)

+------+----------------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+
|job_id|           title|        location|department|salary_range|     company_profile|         description|        requirements|benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|industry| function|fraudulent|
+------+----------------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+
|     1|Marketing Intern|US, NY, New York| Marketing|        null|We're Food52, and...|Food52, a fast-gr...|Experience with c...|    null|            0|               1|            0|          

In [106]:
df.dtypes

[('job_id', 'string'),
 ('title', 'string'),
 ('location', 'string'),
 ('department', 'string'),
 ('salary_range', 'string'),
 ('company_profile', 'string'),
 ('description', 'string'),
 ('requirements', 'string'),
 ('benefits', 'string'),
 ('telecommuting', 'string'),
 ('has_company_logo', 'string'),
 ('has_questions', 'string'),
 ('employment_type', 'string'),
 ('required_experience', 'string'),
 ('required_education', 'string'),
 ('industry', 'string'),
 ('function', 'string'),
 ('fraudulent', 'string')]

In [107]:
for col in df.columns:
    df = df.withColumn(col, F.lower(col))
    
df.show(1)

+------+----------------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+
|job_id|           title|        location|department|salary_range|     company_profile|         description|        requirements|benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|industry| function|fraudulent|
+------+----------------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+
|     1|marketing intern|us, ny, new york| marketing|        null|we're food52, and...|food52, a fast-gr...|experience with c...|    null|            0|               1|            0|          

### We divide the 'location' column into 'country' and 'city'

In [108]:
from pyspark.sql.types import StringType

def get_country(location):
    if location is None:
        return None
    else:
        location_list = location.strip().split(sep = ",")
        return location_list[0]

def get_city(location):
    if location is None:
        return None
    else:
        location_list = location.strip().split(sep = ",")
        if(len(location_list)>1 and not location_list[1].isspace()):
            return location_list[1]
        else: 
            return None

udf_country = F.udf(get_country, StringType())
udf_city = F.udf(get_city, StringType())


In [109]:
df = df.withColumn("country", udf_country(F.col('location')))
df = df.withColumn("city", udf_city(F.col('location')))

#Due to the poor form of the location column, country and state are the only useful columns we can get.
df = df.drop(F.col('location'))

df.show(1)

+------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+-------+----+
|job_id|           title|department|salary_range|     company_profile|         description|        requirements|benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|industry| function|fraudulent|country|city|
+------+----------------+----------+------------+--------------------+--------------------+--------------------+--------+-------------+----------------+-------------+---------------+-------------------+------------------+--------+---------+----------+-------+----+
|     1|marketing intern| marketing|        null|we're food52, and...|food52, a fast-gr...|experience with c...|    null|            0|               1|            0|          other|         internship|   

### Since the salary_range column may vary because of different currencies, we get the currency for each country and we convert it to USD. If its not available we leave it as it is.

In [110]:
df.select("country").distinct().count()

91

In [111]:
from bs4 import BeautifulSoup
import requests

URL = "https://unece.org/fileadmin/DAM/cefact/recommendations/bkup_htm/cocucod.htm"
r = requests.get(URL)
soup = BeautifulSoup(r.content)

In [112]:
country_codes = []
currency_codes = []

info = soup.find_all("font", attrs={'color':'#000000','size':'1','face':'Verdana'})

country_codes = [info[x].text.lower() for x in range(1526) if len(info[x].text)==2]

currency_codes = [info[x].text.lower() for x in range(1526) 
                  if len(info[x].text)==3 
                  and not info[x].text.isnumeric() 
                  and info[x].text.isupper()]

#There are three countries with no official currency in this table, so we have to drop them.
remove_list = ['gs','ps','aq']

country_codes = [x for x in country_codes if x not in remove_list]

CC = list(zip(country_codes,currency_codes))

len(CC)

250

In [113]:
def get_currencycode(country):
    idx = 0
    for i in range(len(CC)):
        if CC[i][0]==country:
            return CC[idx][1]
        idx+=1
    return None       
    

udf_currency = F.udf(get_currencycode, StringType())


df = df.withColumn('salary_currency', udf_currency(F.col('country')))

In [114]:
from pyspark.sql.types import IntegerType

def get_minRange(salary):
    if salary is None:
        return None
    salary_range = salary.strip().split(sep = "-")
    try: 
        r = int(salary_range[0])
        return r 
    except ValueError: 
        return None

def get_maxRange(salary):
    if salary is None:
        return None
    salary_range = salary.strip().split(sep = "-")
    try:
        if(len(salary_range)==1):
            r = int(salary_range[0])    
        else:
            r = int(salary_range[1])
        return r
    except ValueError: 
        return None

udf_minRange = F.udf(get_minRange, IntegerType())
udf_maxRange = F.udf(get_maxRange, IntegerType())

df = df.withColumn('salary_min', udf_minRange(F.col('salary_range')))
df = df.withColumn('salary_max', udf_maxRange(F.col('salary_range')))

df = df.drop(F.col('salary_range'))

df.show(5)

+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+------------------+--------------------+--------------------+----------+-------+----+---------------+----------+----------+
|job_id|               title|department|     company_profile|         description|        requirements|            benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|required_education|            industry|            function|fraudulent|country|city|salary_currency|salary_min|salary_max|
+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+------------------+--------------------+--------------------+----------+-------+----+---------------+----------+----------+
|     1|    m

In [115]:
URL_CURRENCIES = 'https://api.exchangerate-api.com/v4/latest/USD'

curr = requests.get(URL_CURRENCIES).json()
curr = curr['rates']

curr =  {k.lower(): v for k, v in curr.items()}

In [116]:
def get_usd(number, currency):
    if number is None or currency is None or currency not in list(curr.keys()):
        return None
    else:
        return int(number/curr[currency])

udf_usd = F.udf(get_usd, IntegerType())

df = df.withColumn('salary_min', udf_usd(F.col('salary_min'),F.col('salary_currency')))
df = df.withColumn('salary_max', udf_usd(F.col('salary_max'),F.col('salary_currency')))

### Text processing

In [127]:
import re
from pyspark.ml.feature import StopWordsRemover

def re_links(text):
    if text is None:
        return None
    text = re.sub('http[s]?://(?:[a-zA-Z-#]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', " ", text)
    #Remove words with more than 20 length, because they are more likely to be a link that we didnt filter for some reason.
    return  re.sub(r'\b\w{20,}\b', ' ', text)

def re_xa0(text):
    if text is None:
        return None
    return text.replace('\\xa0', ' ')

def re_punctuations(text):
    if text is None:
        return None
    return re.sub("[^0-9-a-z']", " " , text)

def re_space(text):
    if text is None:
        return None
    return re.sub("\s+"," ", text)

def text_cleaning(text):
    text = re_links(text)
    text = re_xa0(text)
    text = re_punctuations(text)
    text = re_space(text)
    return text

udf_cleaning = F.udf(text_cleaning, StringType())
    
text_columns = ['title','department','company_profile','description','requirements','benefits','industry']

for col in text_columns:
    df = df.withColumn(col, udf_cleaning(F.col(col)))

In [128]:
df.show()

+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+-------+----+---------------+----------+----------+
|job_id|               title|department|     company_profile|         description|        requirements|            benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|  required_education|            industry|            function|fraudulent|country|city|salary_currency|salary_min|salary_max|
+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+-------+----+---------------+----------+----------+
|     1

### Apply One-hot-encoding to some columns 

In [100]:
ohe_cols = ['country','city','salary_currency','required_experience','required_education','employment_type','industry','function']

for col in ohe_cols:
    print(f' {col}:{df.select(col).distinct().count()}')

 country:91
 city:325
 salary_currency:78
 required_experience:8
 required_education:14
 employment_type:6
 industry:132
 function:38


In [101]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

for col in ohe_cols:
    stringIndexer = StringIndexer(inputCol=col, outputCol= f"{col}_index").setHandleInvalid("keep")
    model = stringIndexer.fit(df)
    indexed = model.transform(df)
    encoder = OneHotEncoder(dropLast=False, inputCol=f"{col}_index", outputCol=f"{col}_vec")
    encoded = encoder.fit(indexed).transform(indexed)
    df = encoded
    df = df.drop(F.col(f"{col}_index"))
    df = df.drop(F.col(col))


In [102]:
df.show(3)

+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+----------+----------+----------+--------------+-----------------+-------------------+-----------------------+----------------------+-------------------+-----------------+---------------+
|job_id|               title|department|     company_profile|         description|        requirements|            benefits|telecommuting|has_company_logo|has_questions|fraudulent|salary_min|salary_max|   country_vec|         city_vec|salary_currency_vec|required_experience_vec|required_education_vec|employment_type_vec|     industry_vec|   function_vec|
+------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+----------+----------+----------+--------------+-----------------+-------------------+-----------------------+-------