In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os

os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = "python"

# setting up spark
spark = SparkSession \
    .builder \
    .appName("Pyspark Hilti") \
    .config('spark.master', 'local[10]') \
    .config("spark.cores.max", "5") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

In [2]:
spark

In [17]:
### Pre-processing functions for resume_txt
# NLTK toolkit
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import wordnet, stopwords
from nltk import pos_tag

# basic pre-processing utilities
import os
from lxml import etree
import string
import regex as re

translate_table = dict((ord(char), None) for char in string.punctuation)

def remove_html_tags(string):
    parser = etree.HTMLParser()
    tree = etree.fromstring(string, parser)
    parsed_txt = etree.tostring(tree, encoding='unicode', method='text')
    parsed_txt = " ".join(parsed_txt.split())
    
    return parsed_txt

def remove_punctuation(string):
    return string.translate(translate_table)

def remove_stopwords(tokens):
    from nltk.corpus import stopwords
    """
        Filters all stopwords and turn the words into lowercase
    """
    stop_words = set(stopwords.words('english'))
    filtered_sentence = [token.lower() for token in tokens if not token.lower() in stop_words]
    
    return filtered_sentence

def remove_urls(tokens):
    return [w for w in tokens if not w.startswith("http://")]

def tokenize_string(string):
    from nltk.tokenize import word_tokenize
    
    return word_tokenize(string)

def pos_tagging(tokens):
    from nltk import pos_tag
    """
        Receive: a list of string tokens
        Detect the token's part of speech
    """

    pos_tags = pos_tag(tokens)
    return pos_tags

def lemmatize_token(tokens):
    from nltk.stem import WordNetLemmatizer
    from nltk.corpus import wordnet
    
    """
        Tokenize the string
        Lemmatize the string tokens 
        Return lemmatized string
    """
    lemmatizer = WordNetLemmatizer()
    lemmatized_tokens = []
    pos_tokens = pos_tagging(tokens)

    # convert POS tag into usable format of WordNet
    for token, pos in pos_tokens:
        pos = pos
        if pos.startswith('J'):
            pos = wordnet.ADJ

        elif pos.startswith('V'):
            pos = wordnet.VERB
        
        elif pos.startswith('N'):
            pos = wordnet.NOUN

        elif pos.startswith('R'):
            pos = wordnet.ADV

        else:
            # append as it is if don't have POS tagged
            lemmatized_tokens.append(token)
            continue

        lemmatized = lemmatizer.lemmatize(token, pos=pos)
        lemmatized_tokens.append(lemmatized)

    return lemmatized_tokens

def preprocess(string):
    text = remove_html_tags(string)
    text = remove_punctuation(text)
    
    tokens = tokenize_string(text)
    tokens = remove_urls(tokens)
    tokens = remove_stopwords(tokens)
    tokens = lemmatize_token(tokens)

    return " ".join(tokens)


In [5]:
### Read file from raw txt files

import pandas as pd

# Read file from raw text files
def read_file(identifier):
    with open(f'data\\{identifier}.txt') as corpus_reader:
        resume = corpus_reader.read()
        
    with open(f'data\\{identifier}.lab') as title_reader:
        job_title = title_reader.read()
        
    return resume, job_title

resume_corpus = []
job_titles = []

for i in range(1, 29783 + 1):
    resume, job_title = read_file(f'{i:0>5}')
    
    resume_corpus.append(resume)
    job_titles.append(job_title)
    
df = pd.DataFrame({ 'resume_txt': resume_corpus, 'job_title': job_titles })
df.head(5)

Unnamed: 0,resume_txt,job_title
0,"Database Administrator <span class=""hl"">Databa...",Database_Administrator
1,"Database Administrator <span class=""hl"">Databa...",Database_Administrator
2,Oracle Database Administrator Oracle <span cla...,Database_Administrator
3,Amazon Redshift Administrator and ETL Develope...,Database_Administrator
4,Scrum Master Scrum Master Scrum Master Richmon...,Database_Administrator


In [6]:
df['id'] = df.index + 1

# expand the job_title column into multiple rows
df['job_title'] = df['job_title'].str.split('\n')
df = df.explode('job_title')

# one hot encode the results into the main df
encoded = pd.get_dummies(df['job_title']).drop([''], axis=1)
df[encoded.columns] = encoded
df[encoded.columns] = df[encoded.columns].replace({ True: 1, False: 0 })

# merge one hot encoded into the main dataframe
main_df = df.groupby('id')[encoded.columns].max().reset_index().copy(deep=True)

# drop all duplicates of original duplicates after exploding method
df1 = df[['id', 'resume_txt']].drop_duplicates()

# merge 2 dataframes 
main_df = pd.merge(main_df, df1, how='left', on='id')
main_df = main_df.drop(['id'], axis=1) # no need for id anymore

  df[encoded.columns] = df[encoded.columns].replace({ True: 1, False: 0 })


In [29]:
main_df.head(5)

Unnamed: 0,Database_Administrator,Front_End_Developer,Java_Developer,Network_Administrator,Project_manager,Python_Developer,Security_Analyst,Software_Developer,Systems_Administrator,Web_Developer,resume_txt
0,1,0,0,0,0,0,0,0,0,0,"Database Administrator <span class=""hl"">Databa..."
1,1,0,0,0,0,0,0,0,0,0,"Database Administrator <span class=""hl"">Databa..."
2,1,0,0,0,0,0,0,0,0,0,Oracle Database Administrator Oracle <span cla...
3,1,0,0,0,0,0,0,0,0,0,Amazon Redshift Administrator and ETL Develope...
4,1,0,0,0,0,0,0,0,0,0,Scrum Master Scrum Master Scrum Master Richmon...


In [18]:
df = spark.createDataFrame(main_df)
df.printSchema()

root
 |-- Database_Administrator: long (nullable = true)
 |-- Front_End_Developer: long (nullable = true)
 |-- Java_Developer: long (nullable = true)
 |-- Network_Administrator: long (nullable = true)
 |-- Project_manager: long (nullable = true)
 |-- Python_Developer: long (nullable = true)
 |-- Security_Analyst: long (nullable = true)
 |-- Software_Developer: long (nullable = true)
 |-- Systems_Administrator: long (nullable = true)
 |-- Web_Developer: long (nullable = true)
 |-- resume_txt: string (nullable = true)



In [19]:
# reference: https://stackoverflow.com/questions/33768967/spark-how-can-evenly-distribute-my-records-in-all-partition

N = 128 # number of partitions

evenly_partitioned = (
    df.rdd
        .zipWithIndex()
        .map(lambda p: (p[1], p[0]))
        .partitionBy(N, lambda p: p)
        .values()
)

In [20]:
# convert to Dataframe object
df = evenly_partitioned.toDF(main_df.columns.tolist())

In [21]:
df.show(5)

+----------------------+-------------------+--------------+---------------------+---------------+----------------+----------------+------------------+---------------------+-------------+--------------------+
|Database_Administrator|Front_End_Developer|Java_Developer|Network_Administrator|Project_manager|Python_Developer|Security_Analyst|Software_Developer|Systems_Administrator|Web_Developer|          resume_txt|
+----------------------+-------------------+--------------+---------------------+---------------+----------------+----------------+------------------+---------------------+-------------+--------------------+
|                     1|                  0|             0|                    0|              0|               0|               0|                 0|                    0|            0|Database Administ...|
|                     1|                  0|             0|                    0|              0|               0|               0|                 0|                  

In [22]:
# check number of records in each partition
df.withColumn("partitionId", spark_partition_id() 
                      ).groupBy("partitionId").count().show() 

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0|  233|
|          1|  233|
|          2|  233|
|          3|  233|
|          4|  233|
|          5|  233|
|          6|  233|
|          7|  233|
|          8|  233|
|          9|  233|
|         10|  233|
|         11|  233|
|         12|  233|
|         13|  233|
|         14|  233|
|         15|  233|
|         16|  233|
|         17|  233|
|         18|  233|
|         19|  233|
+-----------+-----+
only showing top 20 rows



In [23]:
df.select('resume_txt').take(1)

[Row(resume_txt='Database Administrator <span class="hl">Database</span> <span class="hl">Administrator</span> Database Administrator - Family Private Care LLC Lawrenceville, GA A self-motivated Production SQL Server Database Administrator who possesses\xa0 strong analytical and problem solving skills. My experience includes SQL Server\xa0 2005, 2008 and 2012, 2014, SSIS, as well as clustering, mirroring, and high\xa0 availability solutions in OLTP environments. I am proficient in database backup,\xa0 recovery, performance tuning, maintenance tasks, security, and consolidation.\xa0 I am confident that I would make a beneficial addition to any company. Over the\xa0 course of my career thus far, I have designed databases to fit a variety of needs,\xa0 successfully ensured the security of those databases, problem-solved in order to meet\xa0 both back-end and front-end needs, installed and tested new versions database\xa0 management systems, customized and installed applications and meticu

In [24]:
# plan the preprocessing pipeline
preprocessed_df = df.withColumn('resume_txt', udf(preprocess, StringType())(col('resume_txt')))
preprocessed_df = preprocessed_df.withColumn('resume_txt', regexp_replace('resume_txt', '([R|r]2)|(\\xa0 \*?)|(•)', ''))

# execute the plan
preprocessed_df = spark.createDataFrame(preprocessed_df.collect())

In [25]:
preprocessed_df.select('resume_txt').take(5)

[Row(resume_txt='database administrator database administrator database administrator family private care llc lawrenceville ga selfmotivated production sql server database administrator possess strong analytical problem solve skill experience include sql server 2005 2008 2012 2014 ssis well cluster mirror high availability solution oltp environment proficient database backup recovery performance tune maintenance task security consolidation confident would make beneficial addition company course career thus far design database fit variety need successfully ensure security databases problemsolved order meet backend frontend need instal test new version database management system customize installed application meticulously monitor performance smooth frontend experience possible 5 6 year work database work experience database administrator family private care llc roswell ga april 2017 present confirm backup make successfully saved secure location plan backup recovery database information 

In [26]:
# compress all partitions into 1 partition so that only 1 parquet file is output
preprocessed_df.coalesce(1).write.parquet('parquet/preprocessed.parquet', mode='overwrite')

### Check Output File

In [27]:
test_df_parquet = pd.read_parquet('parquet/preprocessed.parquet/part-00000-7482d96e-f64c-4ec8-b132-9965fa89fca4-c000.snappy.parquet')

In [31]:
test_df_parquet['resume_txt'][2]

'sr oracle database administrator sr oracle database administrator sr oracle database administrator east windsor ct 7 year experience oracle database administrator enterprise level database administration management operation extensive experience instal design manage oracle 12c exadata 11g rac10g rac9i8i databases windows unix hpux ibm aix sun solaris installedconfigured oracle 11g10g real application cluster rac upgraded database oracle 8i 9i 10g oracle 10g 11g experience 247 production database support role extensive experience backup recovery hot cold logical rman incremental backup expert performance monitoring tune oracle database unixlinux platform use database tune sql trace tkprof explain plan statspack awr addm experience table index partition increase performance well storage management partition strategy large table partition large object improve query performance manageability identify bottleneck wait base analysis use oracle wait interface excellent experience database cro

In [33]:
test_df_parquet.shape # should have 29783 rows

(29783, 11)

In [None]:
import pandas as pd 

def read_file(identifier):
    with open(f'data\\{identifier}.txt') as corpus_reader:
        resume = corpus_reader.read()
        
    with open(f'data\\{identifier}.lab') as title_reader:
        job_title = title_reader.read()
        
    return resume, job_title

resume_corpus = []
job_titles = []

for i in range(1, 29783 + 1):
    resume, job_title = read_file(f'{i:0>5}')
    
    resume_corpus.append(resume)
    job_titles.append(job_title)
    
df = pd.DataFrame({ 'resume_txt': resume_corpus, 'job_title': job_titles })

In [None]:
df.to_parquet('read_file.parquet', index=False)