# NLP in Pyspark's MLlib Project

## Fake Job Posting Predictions

Indeed.com has just hired you to create a system that automatically flags suspicious job postings on it's website. It has recently seen an influx of fake job postings that is negativley impacting it's customer experience. Becuase of the high volume of job postings it receives everyday, their employees do have the capacity to check every posting so they would like prioritize which postings to review before deleting it. 

#### Your task
Use the attached dataset with NLP to create an alogorthim which automatically flags suspicious posts for review. 

#### The data
This dataset contains 18K job descriptions out of which about 800 are fake. The data consists of both textual information and meta-information about the jobs.

**Data Source:** https://www.kaggle.com/shivamb/real-or-fake-fake-jobposting-prediction

#### Have fun!

In [1]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [2]:
# Install pyspark
!pip install pyspark
# import findspark
# findspark.init()
import os
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("NLP").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=455ea31f19328e2de42116e1bc56258018a4f001b064a2af40b51671d301ca32
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from pyspark.ml.feature import * #CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import * #col, udf,regexp_replace,isnull
from pyspark.sql.types import * #StringType,IntegerType
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# For pipeline development
from pyspark.ml import Pipeline 

## Read The Data

In [4]:
path = '/content/drive/MyDrive/Data Science Intake43/5. Spark/spark-scripts/section3/Datasets/'
os.listdir(path)

['beatsdataset.csv',
 'kickstarter.csv',
 'housing.csv',
 'fake_job_postings.csv',
 'Toddler Autism dataset July 2018.csv',
 'Concrete_Data.csv']

In [5]:
# CSV
df = spark.read.csv(path+'fake_job_postings.csv',inferSchema=True,header=True)

In [6]:
df.limit(10).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0
1,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",Organised - Focused - Vibrant - Awesome!Do you...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0
2,3,Commissioning Machinery Assistant (CMA),"US, IA, Wever",,,Valor Services provides Workforce Solutions th...,"Our client, located in Houston, is actively se...",Implement pre-commissioning and commissioning ...,,0,1,0,,,,,,0
3,4,Account Executive - Washington DC,"US, DC, Washington",Sales,,Our passion for improving quality of life thro...,THE COMPANY: ESRI – Environmental Systems Rese...,"EDUCATION: Bachelor’s or Master’s in GIS, busi...",Our culture is anything but corporate—we have ...,0,1,0,Full-time,Mid-Senior level,Bachelor's Degree,Computer Software,Sales,0
4,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,JOB TITLE: Itemization Review ManagerLOCATION:...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0
5,6,Accounting Clerk,"US, MD,",,,,Job OverviewApex is an environmental consultin...,,,0,0,0,,,,,,0
6,7,Head of Content (m/f),"DE, BE, Berlin",ANDROIDPIT,20000-28000,"Founded in 2009, the Fonpit AG rose with its i...",Your Responsibilities: Manage the English-spea...,Your Know-How: ...,Your Benefits: Being part of a fast-growing co...,0,1,1,Full-time,Mid-Senior level,Master's Degree,Online Media,Management,0
7,8,Lead Guest Service Specialist,"US, CA, San Francisco",,,Airenvy’s mission is to provide lucrative yet ...,Who is Airenvy?Hey there! We are seasoned entr...,"Experience with CRM software, live chat, and p...",Competitive Pay. You'll be able to eat steak e...,0,1,1,,,,,,0
8,9,HP BSM SME,"US, FL, Pensacola",,,Solutions3 is a woman-owned small business who...,Implementation/Configuration/Testing/Training ...,MUST BE A US CITIZEN.An active TS/SCI clearanc...,,0,1,1,Full-time,Associate,,Information Technology and Services,,0
9,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",The Customer Service Associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0


In [7]:
# Let's read a few full blurbs
df.show(10,False)

+------+-----------------------------------------+---------------------+----------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
df.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [9]:
df.count()

17880

In [10]:
print((df.count(), len(df.columns)))

(17880, 18)


## Check The Classes Of The Data

In [11]:
tot = df.count()
df.groupBy("fraudulent").count().withColumnRenamed('count', 'cnt_per_group').withColumn('perc_of_count_total', (col('cnt_per_group') / tot) * 100 ).show(100)

+--------------------+-------------+--------------------+
|          fraudulent|cnt_per_group| perc_of_count_total|
+--------------------+-------------+--------------------+
|Required Qualific...|            1|0.005592841163310962|
| unique opportuni...|            1|0.005592841163310962|
|High School or eq...|            3|0.016778523489932886|
| depending on the...|            1|0.005592841163310962|
|  Product Management|            2|0.011185682326621925|
| analyzing the ev...|            1|0.005592841163310962|
| email us with 3 ...|            1|0.005592841163310962|
|        make stuff."|            1|0.005592841163310962|
|     Master's Degree|            5| 0.02796420581655481|
|""sans-serif"";ms...|            1|0.005592841163310962|
| además con el fi...|           10| 0.05592841163310962|
|               Sales|           14| 0.07829977628635347|
| but not necessar...|            1|0.005592841163310962|
| process setup an...|            1|0.005592841163310962|
| elasticsearc

## Drop any row that's not classified fraud or not (0,1)

In [12]:
df = df.filter("fraudulent IN('0','1')")
# Make sure it worked
df.groupBy("fraudulent").count().orderBy(col("count").desc()).show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |16080|
|1         |886  |
+----------+-----+



In [13]:
df.filter("fraudulent=1").show(10, False)

+------+---------------------------------------+----------------------------------+--------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
print((df.count(), len(df.columns)))

(16966, 18)


In [15]:
# df = df.sampleBy("fraudulent", fractions={"0": 0.4, "1": 1.0}, seed=10)
# # QA again
# df.groupBy("fraudulent").count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6485 |
|1         |886  |
+----------+-----+



## Check Count And Percentage Of Each Column

In [16]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+------------------+
|        Column_Name|Null_Values_Count|Null_Value_Percent|
+-------------------+-----------------+------------------+
|           location|              151| 2.048568715235382|
|         department|             4868| 66.04259937593271|
|       salary_range|             6160|  83.5707502374169|
|    company_profile|             1620|21.978021978021978|
|       requirements|             1134|15.384615384615385|
|           benefits|             3032|  41.1341744675078|
|    employment_type|             1449| 19.65811965811966|
|required_experience|             2961| 40.17094017094017|
| required_education|             3328| 45.14991181657848|
|           industry|             2030| 27.54036087369421|
|           function|             2661|36.100936100936096|
+-------------------+-----------------+------------------+



### Since the percentage of nulls might seem alot more to be dropped, we may ignore the unnecessary columns and only consider the important ones only
- Include only: 
  - Location: Frauds may be associated with some locations over others
  - Description: Contains the actual text of the job. 
  - Fraudulent: The Target

In [17]:
# remove unwanted columns 
# How about by subset by just the vars we need for now.
filtered = df.na.drop(subset=["Location", "description", "fraudulent"])
print((filtered.count(), len(filtered.columns)))

(7220, 18)


In [18]:
filtered.show()

+------+--------------------+-------------------+----------+------------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+
|job_id|               title|           location|department|salary_range|     company_profile|         description|        requirements|            benefits|telecommuting|has_company_logo|has_questions|employment_type|required_experience|  required_education|            industry|            function|fraudulent|
+------+--------------------+-------------------+----------+------------+--------------------+--------------------+--------------------+--------------------+-------------+----------------+-------------+---------------+-------------------+--------------------+--------------------+--------------------+----------+
|     1|    Marketing Intern|   US, NY, New York| Marketing| 

### Check nulls again

In [19]:
null_columns_calc_list = null_value_calc(filtered)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+------------------+
|        Column_Name|Null_Values_Count|Null_Value_Percent|
+-------------------+-----------------+------------------+
|         department|             4742|  65.6786703601108|
|       salary_range|             6018| 83.35180055401662|
|    company_profile|             1585|21.952908587257618|
|       requirements|             1056|14.626038781163436|
|           benefits|             2925| 40.51246537396122|
|    employment_type|             1358|18.808864265927976|
|required_experience|             2854|39.529085872576175|
| required_education|             3204| 44.37673130193906|
|           industry|             1924| 26.64819944598338|
|           function|             2547| 35.27700831024931|
+-------------------+-----------------+------------------+



## Check class balance
- The data is obviously imbalanced and that may be treated in different ways like:
  1. Change the accuracy metric to be able to monitor both false and true positives and negatives
  2. K-Fold Cross Validation
  3. Sampling the data (delete some of the data labeled 0 to maintain balance)

In [20]:
filtered.groupBy("fraudulent").count().orderBy(col("count").desc()).show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6354 |
|1         |866  |
+----------+-----+



In [21]:
tot = filtered.count()
filtered.groupBy("fraudulent").count().withColumnRenamed('count', 'cnt_per_group').withColumn('perc_of_count_total', (col('cnt_per_group') / tot) * 100 ).show(100)

+----------+-------------+-------------------+
|fraudulent|cnt_per_group|perc_of_count_total|
+----------+-------------+-------------------+
|         0|         6354|  88.00554016620498|
|         1|          866| 11.994459833795014|
+----------+-------------+-------------------+



## Preprocessing the data, both labels and text

In [22]:
# Let's go ahead and encode it too
indexer = StringIndexer(inputCol="fraudulent", outputCol="label")
df = indexer.fit(filtered).transform(filtered)
df.limit(6).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0,0.0
1,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,JOB TITLE: Itemization Review ManagerLOCATION:...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0,0.0
2,6,Accounting Clerk,"US, MD,",,,,Job OverviewApex is an environmental consultin...,,,0,0,0,,,,,,0,0.0
3,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",The Customer Service Associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0,0.0
4,12,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,TransferWise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,1,0,,,,,,0,0.0
5,13,"Applications Developer, Digital","US, CT, Stamford",,,"Novitex Enterprise Solutions, formerly Pitney ...","The Applications Developer, Digital will devel...",Requirements:4 – 5 years’ experience in develo...,,0,1,0,Full-time,Associate,Bachelor's Degree,Management Consulting,Information Technology,0,0.0


In [23]:
# Removing anything that is not a letter
filtered = filtered.withColumn("desc",regexp_replace(col('description'), '[^A-Za-z ]+', ''))
# df.select("blurb").show(10,False)

# only letters
filtered = filtered.withColumn("desc",regexp_replace(col('desc'), '\d+', ''))

# Remove multiple spaces
filtered = filtered.withColumn("desc",regexp_replace(col('desc'), ' +', ' '))
filtered.select("desc").show(4,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
################# AFTER ##################


# Tokenize
regex_tokenizer = RegexTokenizer(inputCol="desc", outputCol="tokened", pattern="\\W")
# raw_words = regex_tokenizer.transform(df)

# Remove Stop words
remover = StopWordsRemover(inputCol=regex_tokenizer.getOutputCol(), outputCol="filtered")
# words_df = remover.transform(raw_words)

# Zero Index Label Column
indexer = StringIndexer(inputCol="fraudulent", outputCol="label")
# feature_data = indexer.fit(words_df).transform(words_df)

# Create the Pipeline
pipeline = Pipeline(stages=[regex_tokenizer,remover,indexer])
data_prep_pl = pipeline.fit(filtered)
# print(type(data_prep_pl))
# print(" ")
# Now call on the Pipeline to get our final df
feature_data = data_prep_pl.transform(filtered)
feature_data.show(10,False)

+------+---------------------------------------------+------------------+----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
feature_data.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- tokened: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |

In [26]:
feature_data.select('filtered', 'label').show(10, False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Apply the text hashing techniques:
- HashingTF
- TF-IDF
- Word2Vec

In [26]:
# Hashing TF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(feature_data)

# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

#rename the HTF features to features to be consistent
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

In [27]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)
# W2VfeaturizedData.show(1,False)

# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','description','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later

In [29]:
import numpy as np
import pandas as pd

## Bulid the ML models and feed the extracted features from each function to compare them.

In [57]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        # MC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', metricName="areaUnderROC") # redictionCol="prediction",
        auc = MC_evaluator.evaluate(predictions)
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        auc = [str(auc)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

In [59]:
# from pyspark.ml.classification import *
# from pyspark.ml.evaluation import *
# from pyspark.sql import functions
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

### Train All the models on all the extracted features

In [60]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'precision']
    vals = [("Place Holder", "N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        print(new_result)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-0.01509136, -0.06001257,  0.00330187,  0.03427314, -0.01172932,
               0.00264982,  0.06722654, -0.02211595, -0.01135455,  0.00627504,
               0.03766343, -0.06991995, -0.03014226, -0.02319122, -0.05916711,
               0.07543862,  0.00783984, -0.02213059,  0.05004122,  0.01426449]])
Intercept: [-2.7428968907714633]
DataFrame[Classifier: string, Result: string]
 
[1mOneVsRest[0m
[1mIntercept: [0m 2.7789274550778873 [1m
Coefficients:[0m [0.0019207855072316854,0.010639264671297805,0.0001741328122380362,-0.004599622810734527,0.001798200845577429,0.002021781599715167,-0.01078351029074588,0.004510691204903312,0.0025751769215736227,0.00032711032284393104,-0.0033444014374244472,0.011299794983498068,0.004732403596074051,0.0025679288827071024,0.009622061334971578,-0.01110516631271376,0.0006066045257997958,0.003

### Conclusion: Random Forest Classifier with both HTFfeaturizedData and TFIDFfeaturizedData introduced the best performance in terms of precision (around 96%) 

## Now test the model again but after balancing the data:
1. Uncomment this cell from above
2. run the notebook again but on the sampled data

In [None]:
df = df.sampleBy("fraudulent", fractions={"0": 0.4, "1": 1.0}, seed=10)
# QA again
df.groupBy("fraudulent").count().show(truncate=False)

In [27]:
# Hashing TF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(feature_data)

# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

#rename the HTF features to features to be consistent
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

In [28]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)
# W2VfeaturizedData.show(1,False)

# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','description','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later

In [30]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        # MC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', metricName="areaUnderROC") # redictionCol="prediction",
        auc = MC_evaluator.evaluate(predictions)
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        auc = [str(auc)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

In [31]:
# from pyspark.ml.classification import *
# from pyspark.ml.evaluation import *
# from pyspark.sql import functions
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

In [32]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'precision']
    vals = [("Place Holder", "N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        print(new_result)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-0.01713839, -0.02440519,  0.00388787,  0.0292528 , -0.01541605,
               0.00485583,  0.07014939, -0.01890592, -0.02032243, -0.00816631,
               0.0429817 , -0.07306288, -0.05090061, -0.04602583, -0.05523018,
               0.06806642,  0.03269635, -0.01384769,  0.05258747,  0.01014923]])
Intercept: [-1.7483946951979954]
DataFrame[Classifier: string, Result: string]
 
[1mOneVsRest[0m
[1mIntercept: [0m 1.7912617893302594 [1m
Coefficients:[0m [0.0035749139237599378,0.009584295809463182,0.0004211038663536735,-0.0069337080201684,0.003937041846247103,0.0019517096784647866,-0.018289244668256578,0.006460489220935337,0.006590465268090441,0.003565628948743168,-0.007443638707484885,0.021236854153699573,0.012606685719433049,0.010159969168237831,0.01583020379705562,-0.01760222504475782,-0.005889883906772035,0.004013137

# Conclusion: 
1. Before Sampling: Random Forest Classifier with both HTFfeaturizedData and TFIDFfeaturizedData introduced the best performance in terms of precision (around 96%) 
2. After Sampling: The same classifier with the same Features but with a little less precision ()
