# NLP in Pyspark's MLlib Project

## Fake Job Posting Predictions

Indeed.com has just hired you to create a system that automatically flags suspicious job postings on it's website. It has recently seen an influx of fake job postings that is negativley impacting it's customer experience. Becuase of the high volume of job postings it receives everyday, their employees do have the capacity to check every posting so they would like prioritize which postings to review before deleting it. 

#### The data
This dataset contains 18K job descriptions out of which about 800 are fake. The data consists of both textual information and meta-information about the jobs.

**Data Source:** https://www.kaggle.com/shivamb/real-or-fake-fake-jobposting-prediction

#### Have fun!

In [1]:
# First let's create our PySpark instance

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("NLP_project").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [2]:
from pyspark.ml.feature import * 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *

# For pipeline development
from pyspark.ml import Pipeline

In [3]:
!ls Datasets/

 beatsdataset.csv	 housing.csv
 Concrete_Data_Yeh.csv	 kickstarter.csv
 fake_job_postings.csv	'Toddler Autism dataset July 2018.csv'


In [4]:
path = "Datasets/"
df = spark.read.csv(path+'fake_job_postings.csv',inferSchema=True,header=True)

In [5]:
df.limit(4).toPandas()

Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
0,1,Marketing Intern,"US, NY, New York",Marketing,,"We're Food52, and we've created a groundbreaki...","Food52, a fast-growing, James Beard Award-winn...",Experience with content management systems a m...,,0,1,0,Other,Internship,,,Marketing,0
1,2,Customer Service - Cloud Video Production,"NZ, , Auckland",Success,,"90 Seconds, the worlds Cloud Video Production ...",Organised - Focused - Vibrant - Awesome!Do you...,What we expect from you:Your key responsibilit...,What you will get from usThrough being part of...,0,1,0,Full-time,Not Applicable,,Marketing and Advertising,Customer Service,0
2,3,Commissioning Machinery Assistant (CMA),"US, IA, Wever",,,Valor Services provides Workforce Solutions th...,"Our client, located in Houston, is actively se...",Implement pre-commissioning and commissioning ...,,0,1,0,,,,,,0
3,4,Account Executive - Washington DC,"US, DC, Washington",Sales,,Our passion for improving quality of life thro...,THE COMPANY: ESRI – Environmental Systems Rese...,"EDUCATION: Bachelor’s or Master’s in GIS, busi...",Our culture is anything but corporate—we have ...,0,1,0,Full-time,Mid-Senior level,Bachelor's Degree,Computer Software,Sales,0


In [6]:
df.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- company_profile: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- required_experience: string (nullable = true)
 |-- required_education: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- function: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [7]:
# df.groupBy('fraudulent').count().orderBy(col('count').desc()).show(truncate=False)

# Cleaning Data

In [8]:
#drop job_id 3alshn malosh lazma
df = df.drop('job_id')

In [9]:
def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+--------------------+
|        Column_Name|Null_Values_Count|  Null_Value_Percent|
+-------------------+-----------------+--------------------+
|           location|              346|  1.9351230425055927|
|         department|            11547|   64.58053691275167|
|       salary_range|            15011|   83.95413870246085|
|    company_profile|             3308|  18.501118568232663|
|        description|                1|0.005592841163310962|
|       requirements|             2573|  14.390380313199106|
|           benefits|             6966|   38.95973154362416|
|      telecommuting|               89| 0.49776286353467564|
|   has_company_logo|               29|  0.1621923937360179|
|      has_questions|               30| 0.16778523489932887|
|    employment_type|             3292|   18.41163310961969|
|required_experience|             6723|  37.600671140939596|
| required_education|             7748|  43.333333333333336|
|           industry|   

In [10]:
# Of course you will want to know how many rows that affected before you actually execute it..
og_len = df.count()
drop_len = df.na.drop().count()
print("Total Rows that contain at least one null value:",og_len-drop_len)
print("Percentage of Rows that contain at least one null value:", (og_len-drop_len)/og_len)

Total Rows that contain at least one null value: 17094
Percentage of Rows that contain at least one null value: 0.9560402684563758


### Couldn't drop the all the rows with null values
- So, I will drop the columns that contain more than 15% nulls

In [11]:
drop_columns = ('department','salary_range','company_profile','benefits','employment_type','required_experience','required_education','industry','function')

df = df.drop(*drop_columns)

In [12]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [13]:
# drop dubplicated rows
df = df.dropDuplicates()

In [14]:
# Drop the null values
# It's only about 16% so that's okay
df = df.dropna()

In [15]:
# Cleaning the misleading data
df = df.filter("fraudulent IN('0','1')")
df = df.filter("has_questions IN('0','1')")
df = df.filter("has_company_logo IN('0','1')")
df = df.filter("telecommuting IN('0','1')")

In [16]:
# now change the data type to be integer after cleaning the misleading data in those columns 
df = df.withColumn("fraudulent", df["fraudulent"].cast(IntegerType())) \
        .withColumn("has_questions", df["has_questions"].cast(IntegerType())) \
        .withColumn("has_company_logo",df.has_company_logo.cast(IntegerType())) \
        .withColumn("telecommuting",df.telecommuting.cast(IntegerType()))
#QA
print(df.printSchema())

root
 |-- title: string (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- requirements: string (nullable = true)
 |-- telecommuting: integer (nullable = true)
 |-- has_company_logo: integer (nullable = true)
 |-- has_questions: integer (nullable = true)
 |-- fraudulent: integer (nullable = true)

None


In [17]:
df.count()

13790

In [18]:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()

Unnamed: 0,summary,title,location,description,requirements,telecommuting,has_company_logo,has_questions,fraudulent
0,count,13790,13790,13790,13790,13790,13790,13790,13790
1,min,Environmental Technician I,"AE, ,",Normal 0 false false false E...,Normal 0 Confident yet easy-going; firm ...,0,0,0,0
2,25%,,,,,0,1,0,0
3,75%,,,,,0,1,1,0
4,max,~ LM Structures - Expression of Interest for F...,"ZM, ,","￼￼Create, maintain and adjust portfolio of ass...", Strong working knowledge of quantitative res...,1,1,1,1


In [19]:
df.limit(5).toPandas()

Unnamed: 0,title,location,description,requirements,telecommuting,has_company_logo,has_questions,fraudulent
0,Customer Service Associate - Part Time,"US, OR, Portland",The Customer Service Associate will be based i...,Minimum Requirements:Minimum of 6 months custo...,0,1,0,0
1,English Teacher Abroad,"US, CA, Riverside","Play with kids, get paid for it Love travel? J...",University degree required. TEFL / TESOL / CEL...,0,1,1,0
2,URGENT HIRING For Web Designer...!!!,"IN, UP, Noida",We are looking for a talented Web Designer to ...,1+ years of Web Design experienceDemonstrable ...,1,0,0,0
3,Web Developer,"US, MA, Braintree","Requirements:Strong understanding of C#, #URL_...",Basic C# (Sharp),0,0,0,0
4,Process Engineer,"US, NC, Lexington",We have an immediate opening for a Process Eng...,Trade School Education (Degree) in Engineering...,0,1,0,0


#### I have now 4 texts columns with 3 integers, and the targer

I think, I will split the location to get the city, and have no idea what to do with the rest of the columns rather than clean them as a text and chose one way to transform them into numbers, then into the ml model. let's do it tomorrow isA.

In [20]:
list_cols = df.columns[:-1]

In [21]:
df = df.withColumn('concat_cols',concat_ws(' ',*list_cols))
df = df.drop(*list_cols)

In [22]:
df.groupBy('fraudulent').count().show(truncate=False)

+----------+-----+
|fraudulent|count|
+----------+-----+
|1         |645  |
|0         |13145|
+----------+-----+



In [23]:
df.show(1,truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
df = df.withColumn("concat_cols", regexp_replace(col("concat_cols"), "([a-z])([A-Z])", "$1 $2"))

### its time to clean the concat_col column

In [25]:
df = df.withColumn("concat_cols",translate(col("concat_cols"), "/()", "   ")) \
        .withColumn("concat_cols",regexp_replace(col('concat_cols'), '[^A-Za-z0-9 ]+', '')) \
        .withColumn("concat_cols",regexp_replace(col('concat_cols'), ' +', ' ')) \
        .withColumn("concat_cols",lower(col('concat_cols'))) \
        .withColumn("concat_cols", regexp_replace(col("concat_cols"), "url\\S*", ""))


df.select('concat_cols').show(2,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Prep Data for NLP

In [26]:
regex_tokenizer = RegexTokenizer(inputCol="concat_cols", outputCol="words", pattern="\W") # /W means any word, so this will split by the word.
raw_words = regex_tokenizer.transform(df)
raw_words.show(2,False)
raw_words.printSchema() 

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Removing Stopwords

In [27]:
# Define a list of stop words or use default list
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
stopwords = remover.getStopWords() 
words_df = remover.transform(raw_words)
words_df.limit(1).show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Now we need to encode state column to a column of indices
- Remember that MLlib requres our dependent variable to not only be a numeric data type, but also zero indexed. We can use Sparks handy built in StringIndexer function to accomplish this, just like we did in the classification lectures.

In [28]:
indexer = StringIndexer(inputCol="fraudulent", outputCol="label")
feature_data = indexer.fit(words_df).transform(words_df)
feature_data.show(5)
feature_data.printSchema()

+----------+--------------------+--------------------+--------------------+-----+
|fraudulent|         concat_cols|               words|            filtered|label|
+----------+--------------------+--------------------+--------------------+-----+
|         0|customer service ...|[customer, servic...|[customer, servic...|  0.0|
|         0|english teacher a...|[english, teacher...|[english, teacher...|  0.0|
|         0|urgent hiring for...|[urgent, hiring, ...|[urgent, hiring, ...|  0.0|
|         0|web developer us ...|[web, developer, ...|[web, developer, ...|  0.0|
|         0|process engineer ...|[process, enginee...|[process, enginee...|  0.0|
+----------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows

root
 |-- fraudulent: integer (nullable = true)
 |-- concat_cols: string (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: str

## Converting text into vectors

In [29]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)
W2VfeaturizedData.show(1,False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Feature scaling to the df

In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [30]:
# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('fraudulent','concat_cols','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later

## Train and Evaluate your model

In [40]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        binary_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC") # redictionCol="prediction",
        auc =  (binary_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(auc)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

In [36]:
# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
                ,OneVsRest()
                ,LinearSVC()
                ,NaiveBayes()
                ,RandomForestClassifier()
                ,GBTClassifier()
                ,DecisionTreeClassifier()
                ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [W2VfeaturizedData]

In [41]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mW2VfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[ 1.44915379,  3.91946469, -0.3920777 ]])

Intercept: [-6.362818163161014]
 
[1mOneVsRest[0m
[1mIntercept: [0m 3.8568141179186304 [1m
Coefficients:[0m [-0.12543884344599449,-1.077648634682996,-0.05592115439431127]
[1mIntercept: [0m -3.8568141179186286 [1m
Coefficients:[0m [0.12543884344599468,1.077648634682993,0.05592115439431121]
 
[1mLinearSVC  Coefficients[0m
You should compares these relative to eachother
Coefficients: 
[-0.009401995175700818,0.00396192522997506,0.004415934519580263]
 
[1mRandomForestClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.31578453891125424,0.35788378521377556,0.32633167587497025])
 
[1mGBTClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.3412704322337057,0.3235086567

Looks like the RandomForestClassifier with the W2VfeaturizedData was our best performing feature list/classifier combo. Let's go with that and create our final model and play around with the test dataframe.

In [47]:
RF_BestModel

RandomForestClassificationModel: uid=RandomForestClassifier_8cfbb8ad7276, numTrees=20, numClasses=2, numFeatures=3

## Let's see some results!

In [48]:
predictions = RF_BestModel.transform(test)
print("Predicted Failures:")
predictions.select("fraudulent","concat_cols").filter("prediction=0").orderBy(predictions["prediction"].desc()).show(3,False)
print(" ")
print("Predicted Success:")
predictions.select("fraudulent","concat_cols").filter("prediction=1").orderBy(predictions["prediction"].desc()).show(3,False)

Predicted Failures:
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [49]:
# this path will create the folder if it does not exist
# I also like to create a unique identifier using a timestamp so i know when the model was created
from datetime import datetime
timestamp = str(datetime.now()) #return local time
path = 'Models/RFModel_'+timestamp
RF_BestModel.save(path)