# NLP in Pyspark's MLlib Project Solution

## Fake Job Posting Predictions
Creating a system that automatically flags suspicious job postings on Indeed.com's website. It has recently seen an influx of fake job postings that is negativley impacting it's customer experience. Becuase of the high volume of job postings it receives everyday, their employees don't have the capacity to check every posting so they would like an automated system that prioritizes which postings to review before deleting it. 

#### Task
Use the attached dataset to create an NLP alogorthim which automatically flags suspicious posts for review. 

#### The data
This dataset contains 18K job descriptions out of which about 800 are fake. The data consists of both textual information and meta-information about the jobs.

**Data Source:** https://www.kaggle.com/shivamb/real-or-fake-fake-jobposting-prediction

#### My basic approach
I think I will use just the job description variable (there are a few lengthier text variables we could have chosen from) for my analysis for now and think about the company profile for another analysis later on. Something that would be cool here would be to create multiple models that all work together to provide a recommendation or a score of how likley it is that a job posting is fake. 

In [2]:
# First let's create our PySpark instance
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("NLP").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


**Read in dependencies**

In [3]:
# from pyspark.ml import Pipeline 
from pyspark.ml.feature import * #CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import * #col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

**And the dataset**

In [4]:
path ="Datasets/"

# CSV
postings = spark.read.csv(path+'fake_job_postings.csv',inferSchema=True,header=True)

**View the data for QA**

In [5]:
postings.limit(4).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0
1,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",Organised - Focused - Vibrant - Awesome!Do you...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0
2,3,Commissioning Machinery Assistant (CMA),"US, IA, Wever",,,Valor Services provides Workforce Solutions th...,"Our client, located in Houston, is actively se...",Implement pre-commissioning and commissioning ...,,0,1,0,,,,,,0
3,4,Account Executive - Washington DC,"US, DC, Washington",Sales,,Our passion for improving quality of life thro...,THE COMPANY: ESRI – Environmental Systems Rese...,"EDUCATION: Bachelor’s or Master’s in GIS, busi...",Our culture is anything but corporate—we have ...,0,1,0,Full-time,Mid-Senior level,Bachelor's Degree,Computer Software,Sales,0


In [6]:
# Let's read a full line of data of fradulent postings
postings.filter("fraudulent=1").show(1,False)
# These look good!

+------+-----------------+---------------+----------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
postings.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)



**See how many rows are in the df**

In [8]:
postings.count()

17880

## Null Values?

In [9]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(postings)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+--------------------+
|        Column_Name|Null_Values_Count|  Null_Value_Percent|
+-------------------+-----------------+--------------------+
|           location|              346|  1.9351230425055927|
|         department|            11547|   64.58053691275167|
|       salary_range|            15011|   83.95413870246085|
|    company_profile|             3308|  18.501118568232663|
|        description|                1|0.005592841163310962|
|       requirements|             2573|  14.390380313199106|
|           benefits|             6966|   38.95973154362416|
|      telecommuting|               89| 0.49776286353467564|
|   has_company_logo|               29|  0.1621923937360179|
|      has_questions|               30| 0.16778523489932887|
|    employment_type|             3292|   18.41163310961969|
|required_experience|             6723|  37.600671140939596|
| required_education|             7748|  43.333333333333336|
|           industry|   

Quite a bit of missing data here. We better be careful dropping

In [10]:
# Let's see how much total
og_len = postings.count()
drop_len = postings.na.drop().count()
print("Total Null Rows:",og_len-drop_len)
print("Percentage Null Rows", (og_len-drop_len)/og_len)

Total Null Rows: 17094
Percentage Null Rows 0.9560402684563758


Wawwww 95% is wayyyy too much. Better find a better approach.

In [11]:
# How about by subset by just the vars we need for now. 
df = postings.na.drop(subset=["fraudulent","description"])

**Much better**

In [12]:
df.count()

17704

In [13]:
# Quick data quality check on the dependent var....
# This should be a binary outcome (0 or 1)
df.groupBy("fraudulent").count().orderBy(col("count").desc()).show(8)

+--------------------+-----+
|          fraudulent|count|
+--------------------+-----+
|                   0|16080|
|                   1|  886|
|           Full-time|   73|
|Hospital & Health...|   55|
|   Bachelor's Degree|   53|
|         Engineering|   26|
| perform quality ...|   17|
|         Unspecified|   15|
+--------------------+-----+
only showing top 8 rows



We can see from the query above that we have some invalid data in the label (fraudulent) column. Let's delete those.

In [14]:
df = df.filter("fraudulent IN('0','1')")
# QA again 
df.groupBy("fraudulent").count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |16080|
|1         |886  |
+----------+-----+



### Balance the signal

The other thing I want to do is resample the dataframe so I get a better signal from the data since there is not many fraudulent cases. I mentioned class imbalance earlier on, but we haven't come accross a good example yet. This is a good one where we see that the ratio between the fraudent cases and the real cases is extremley unbalanced. So undersampling the non-fradulent cases will help with that. 

Luckily, Spark has a cool built in function for this called sampleby to accomplish this. 

In [15]:
df = df.sampleBy("fraudulent", fractions={'0': 0.4, '1': 1.0}, seed=10)
# QA again 
df.groupBy("fraudulent").count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|0         |6323 |
|1         |886  |
+----------+-----+



That's better!

### Encode the label column

In [16]:
# Let's go ahead and encode it too 
indexer = StringIndexer(inputCol="fraudulent", outputCol="label")
df = indexer.fit(df).transform(df)
df.limit(6).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0,0.0
1,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,JOB TITLE: Itemization Review ManagerLOCATION:...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0,0.0
2,6,Accounting Clerk,"US, MD,",,,,Job OverviewApex is an environmental consultin...,,,0,0,0,,,,,,0,0.0
3,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",The Customer Service Associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0,0.0
4,12,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,TransferWise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,1,0,,,,,,0,0.0
5,13,"Applications Developer, Digital","US, CT, Stamford",,,"Novitex Enterprise Solutions, formerly Pitney ...","The Applications Developer, Digital will devel...",Requirements:4 – 5 years’ experience in develo...,,0,1,0,Full-time,Associate,Bachelor's Degree,Management Consulting,Information Technology,0,0.0


In [17]:
# Let's check the quality of the description var
df.select("description").show(1,False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|description                                                                                

Looks pretty standard.

## Clean the datasets

In [18]:
#Removing anything that is not a letter
df = df.withColumn("description",regexp_replace(df["description"], '[^A-Za-z ]+', ''))
# Remove multiple spaces
df = df.withColumn("description",regexp_replace(df["description"], ' +', ' '))
# Lower case everything
df = df.withColumn("description",lower(df["description"]))

In [19]:
df.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...",food a fastgrowing james beard awardwinning on...,Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0,0.0
1,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,job title itemization review managerlocation f...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0,0.0
2,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,0,0,,,,,,0,0.0
3,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",the customer service associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0,0.0
4,12,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,transferwise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,1,0,,,,,,0,0.0


## Split text into words (Tokenizing)

In [20]:
regex_tokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")
df = regex_tokenizer.transform(df)

df.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label,words
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...",food a fastgrowing james beard awardwinning on...,Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0,0.0,"[food, a, fastgrowing, james, beard, awardwinn..."
1,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,job title itemization review managerlocation f...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0,0.0,"[job, title, itemization, review, managerlocat..."
2,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,0,0,,,,,,0,0.0,"[job, overviewapex, is, an, environmental, con..."
3,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",the customer service associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,1,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0,0.0,"[the, customer, service, associate, will, be, ..."
4,12,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,transferwise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,1,0,,,,,,0,0.0,"[transferwise, is, the, clever, new, way, to, ..."


## Removing Stopwords

In [21]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
feature_data = remover.transform(df)
    
feature_data.limit(5).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,...,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent,label,words,filtered
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...",food a fastgrowing james beard awardwinning on...,Experience with content management systems a m...,,0,...,0,Other,Internship,,,Marketing,0,0.0,"[food, a, fastgrowing, james, beard, awardwinn...","[food, fastgrowing, james, beard, awardwinning..."
1,5,Bill Review Manager,"US, FL, Fort Worth",,,SpotSource Solutions LLC is a Global Human Cap...,job title itemization review managerlocation f...,QUALIFICATIONS:RN license in the State of Texa...,Full Benefits Offered,0,...,1,Full-time,Mid-Senior level,Bachelor's Degree,Hospital & Health Care,Health Care Provider,0,0.0,"[job, title, itemization, review, managerlocat...","[job, title, itemization, review, managerlocat..."
2,6,Accounting Clerk,"US, MD,",,,,job overviewapex is an environmental consultin...,,,0,...,0,,,,,,0,0.0,"[job, overviewapex, is, an, environmental, con...","[job, overviewapex, environmental, consulting,..."
3,10,Customer Service Associate - Part Time,"US, AZ, Phoenix",,,"Novitex Enterprise Solutions, formerly Pitney ...",the customer service associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,,0,...,0,Part-time,Entry level,High School or equivalent,Financial Services,Customer Service,0,0.0,"[the, customer, service, associate, will, be, ...","[customer, service, associate, based, phoenix,..."
4,12,Talent Sourcer (6 months fixed-term contract),"GB, LND, London",HR,,Want to build a 21st century financial service...,transferwise is the clever new way to move mon...,We’re looking for someone who:Proven track rec...,You will join one of Europe’s most hotly tippe...,0,...,0,,,,,,0,0.0,"[transferwise, is, the, clever, new, way, to, ...","[transferwise, clever, new, way, move, money, ..."


## Converting text into vectors

We test out the following three vectors

1. Count Vectors
2. TF-IDF
3. Word2Vec

In [22]:
# Count Vector (count vectorizer and hashingTF are basically the same thing)
# cv = CountVectorizer(inputCol="filtered", outputCol="features")
# model = cv.fit(feature_data)
# countVectorizer_features = model.transform(feature_data)

# Hashing TF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(feature_data)

# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

#rename the HTF features to features to be consistent
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

In [23]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)
# W2VfeaturizedData.show(1,False)

# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','description','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later

## Train and Evaluate your model

From here on out, is straight up classification. So we can go and use our trusty function!

In [24]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

Read in all dependencies and declare the algorithims you want to test

In [24]:
# from pyspark.ml.classification import *
# from pyspark.ml.evaluation import *
# from pyspark.sql import functions
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

Loop through all feature types (hashingTF, TFIDF and Word2Vec)

In [25]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-0.07861328,  0.09521266,  0.05930814,  0.02544565, -0.01404843,
               0.00727573,  0.02432943,  0.02157102, -0.1130908 ,  0.02302418,
              -0.00787546, -0.01643257, -0.07322276,  0.08530298, -0.00558598,
               0.00530257, -0.01220314, -0.04317979, -0.00432638,  0.00116288]])
Intercept: [-1.8854828778549837]
 
[1mOneVsRest[0m
[1mIntercept: [0m 1.8923439355518614 [1m
Coefficients:[0m [0.059267892197021964,-0.07330383692724035,-0.04766136880457956,-0.017589526300773217,0.008863971250801354,-0.004483204655200015,-0.019426165171819738,-0.016741976166706134,0.08282538892401284,-0.016839131973471977,0.008695500111854038,0.012993795162704878,0.05483138703151148,-0.0679903513224388,0.005987007705701415,-0.0027776483351765312,0.011517402950165961,0.03444199319430457,0.0012391242656616215,-5.601750493361

 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[ 0.15795884, -1.14993974,  1.44012328]])

Intercept: [-2.260628462255558]
 
[1mOneVsRest[0m
[1mIntercept: [0m 2.1572294932076366 [1m
Coefficients:[0m [-0.009968734000774315,0.3018144025248052,-0.5485710235810011]
[1mIntercept: [0m -2.157229493208149 [1m
Coefficients:[0m [0.009968734000948802,-0.30181440252277497,0.5485710235805575]
 
[1mLinearSVC  Coefficients[0m
You should compares these relative to eachother
Coefficients: 
[-0.9778900608153539,-0.664354309334859,-0.38266192771772833]
 
[1mRandomForestClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.3400168668894659,0.3572873655294014,0.30269576758113265])
 
[1mGBTClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.4018287803598715,0.29300063434469087,0.3051705852954376])
 
[1mDeci

Looks like the Random Forest classifier with either the HTFfeaturizedData or the TFIDFfeaturizedData are our best performing feature list/classifier combos. Let's go with the Hashing TF vector for the sake of simiplicity and create our final model and play around with the test dataframe. 

In [26]:
# Train final model
classifier = RandomForestClassifier()
featureDF = HTFfeaturizedData

train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
features = featureDF.select(['features']).collect()

# Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
class_count = featureDF.select(countDistinct("label")).collect()
classes = class_count[0][0]

#running this afain with generate all the objects need to play around with test data
ClassTrainEval(classifier,features,classes,train,test)

 
[1mRandomForestClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(20,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19],[0.05904010530735948,0.06578533000050482,0.05171311901984166,0.0491519519610675,0.02931341674810501,0.047584673788201776,0.04826740334710147,0.04679172082786815,0.07107577335431015,0.042965741269073134,0.05784651640626589,0.03800432441126964,0.05943861553771799,0.0725388454087228,0.042892316884654634,0.04441071397167837,0.046763281074590435,0.04767411146246487,0.04099556933769976,0.037746469881502646])


DataFrame[Classifier: string, Result: string]

Let's see some results!

In [27]:
predictions = RF_BestModel.transform(test)
print("Predicted Fraudulent:")
predictions.select("fraudulent","description").filter("prediction=1").orderBy(predictions["prediction"].desc()).show(3,False)
print(" ")
print("Predicted Not Fraudulent:")
predictions.select("fraudulent","description").filter("prediction=0").orderBy(predictions["prediction"].desc()).show(3,False)

Predicted Fraudulent:
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## What could be next?

This analysis was really just the tip of the ice berg here. We could also consider the following analysis:

1. Autotag suspicious descriptions
2. Conduct a similar analysis on the company profile field and the requirements field (NLP)
3. Frequent pattern mining on null and non null values in other fields
4. Consider doing anlaysis on amount of typos in the description