# NLP in Pyspark's MLlib Project

## Fake Job Posting Predictions

Indeed.com has just hired you to create a system that automatically flags suspicious job postings on it's website. It has recently seen an influx of fake job postings that is negativley impacting it's customer experience. Becuase of the high volume of job postings it receives everyday, their employees do have the capacity to check every posting so they would like prioritize which postings to review before deleting it. 

#### Your task
Use the attached dataset with NLP to create an alogorthim which automatically flags suspicious posts for review. 

#### The data
This dataset contains 18K job descriptions out of which about 800 are fake. The data consists of both textual information and meta-information about the jobs.

**Data Source:** https://www.kaggle.com/shivamb/real-or-fake-fake-jobposting-prediction

#### Have fun!

In [None]:

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("NLP").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


In [None]:
from pyspark.ml.feature import * 
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import concat_ws

# For pipeline development
from pyspark.ml import Pipeline 
import pandas as pd


## read data 

In [None]:
#I read it as pandas first because there is a long text containing "," so the values shifted into the next  columns.
path ="Datasets/"
panda=pd.read_csv(path+'fake_job_postings.csv')   
schema = StructType([
                     StructField("job_id", StringType(), True),
                     StructField("title", StringType(), True),

                    StructField("location", StringType(), True),
                    StructField("department", StringType(), True),
                    StructField("salary_range", StringType(), True),
                    StructField("company_profile", StringType(), True),
                    StructField("description", StringType(), True),
                    StructField("requirements", StringType(), True),
                    StructField("benefits", StringType(), True),
                    StructField("telecommuting", StringType(), True),
                    StructField("has_company_logo", StringType(), True),
                    StructField("has_questions", StringType(), True),
                    StructField("employment_type", StringType(), True),
                     StructField("required_experience", StringType(), True),
                    StructField("required_education", StringType(), True),
                    StructField("industry", StringType(), True),
                    StructField("function", StringType(), True),
                     StructField("fraudulent", IntegerType(), True),
                    ])



df=spark.createDataFrame(panda,schema=schema)
df.toPandas().iloc[15:,:]


  for column, series in pdf.iteritems():


Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
15,16,VP of Sales - Vault Dragon,"SG, 01, Singapore",Sales,120000-150000,Jungle Ventures is the leading Singapore based...,About Vault Dragon Vault Dragon is Dropbox for...,Key Superpowers3-5 years of high-pressure sale...,"Basic: SGD 120,000Equity negotiable for a rock...",0,1,1,Full-time,Executive,Bachelor's Degree,Facilities Services,Sales,0
16,17,Hands-On QA Leader,"IL, , Tel Aviv, Israel",R&D,,At HoneyBook we’re re-imagining the events ind...,We are looking for a Hands-On QA Leader for ou...,Previous experience in client &amp; server tes...,,0,1,0,Full-time,Mid-Senior level,,Internet,Engineering,0
17,18,Southend-on-Sea Traineeships Under NAS 16-18 Y...,"GB, SOS, Southend-on-Sea",,,Established on the principles that full time e...,Government funding is only available for 16-18...,16-18 year olds only due to government funding...,Career prospects.,0,1,1,,,,,,0
18,19,Visual Designer,"US, NY, New York",,,Kettle is an independent digital agency based ...,Kettle is hiring a Visual Designer!Job Locatio...,,,0,1,0,,,,,,0
19,20,Process Controls Engineer - DCS PLC MS Office ...,"US, PA, USA Northeast",,,We Provide Full Time Permanent Positions for m...,Experienced Process Controls Engineer is requi...,Must have 5 or more years of experience with D...,,0,0,0,Full-time,,,,,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17875,17876,Account Director - Distribution,"CA, ON, Toronto",Sales,,Vend is looking for some awesome new talent to...,Just in case this is the first time you’ve vis...,To ace this role you:Will eat comprehensive St...,What can you expect from us?We have an open cu...,0,1,1,Full-time,Mid-Senior level,,Computer Software,Sales,0
17876,17877,Payroll Accountant,"US, PA, Philadelphia",Accounting,,WebLinc is the e-commerce platform and service...,The Payroll Accountant will focus primarily on...,- B.A. or B.S. in Accounting- Desire to have f...,Health &amp; WellnessMedical planPrescription ...,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Internet,Accounting/Auditing,0
17877,17878,Project Cost Control Staff Engineer - Cost Con...,"US, TX, Houston",,,We Provide Full Time Permanent Positions for m...,Experienced Project Cost Control Staff Enginee...,At least 12 years professional experience.Abil...,,0,0,0,Full-time,,,,,0
17878,17879,Graphic Designer,"NG, LA, Lagos",,,,Nemsia Studios is looking for an experienced v...,1. Must be fluent in the latest versions of Co...,Competitive salary (compensation will be based...,0,0,1,Contract,Not Applicable,Professional,Graphic Design,Design,0


## check null 

In [None]:
from pyspark.sql.functions import *

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull() | isnan(col(k))).count() 
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)



spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-------------------+-----------------+--------------------+
|        Column_Name|Null_Values_Count|  Null_Value_Percent|
+-------------------+-----------------+--------------------+
|           location|              346|  1.9351230425055927|
|         department|            11547|   64.58053691275167|
|       salary_range|            15012|   83.95973154362416|
|    company_profile|             3308|  18.501118568232663|
|        description|                1|0.005592841163310962|
|       requirements|             2695|  15.072706935123042|
|           benefits|             7210|  40.324384787472034|
|    employment_type|             3471|   19.41275167785235|
|required_experience|             7050|   39.42953020134228|
| required_education|             8105|   45.32997762863535|
|           industry|             4903|  27.421700223713646|
|           function|             6455|   36.10178970917226|
+-------------------+-----------------+--------------------+



## replace any null and NaN with empty string 

In [None]:


for c in df.columns:
    df = df.withColumn(c, when(col(c).isNull() | isnan(col(c)), " ").otherwise(col(c)))
df.toPandas().iloc[15:,:]


Unnamed: 0,job_id,title,location,department,salary_range,company_profile,description,requirements,benefits,telecommuting,has_company_logo,has_questions,employment_type,required_experience,required_education,industry,function,fraudulent
15,16,VP of Sales - Vault Dragon,"SG, 01, Singapore",Sales,120000-150000,Jungle Ventures is the leading Singapore based...,About Vault Dragon Vault Dragon is Dropbox for...,Key Superpowers3-5 years of high-pressure sale...,"Basic: SGD 120,000Equity negotiable for a rock...",0,1,1,Full-time,Executive,Bachelor's Degree,Facilities Services,Sales,0
16,17,Hands-On QA Leader,"IL, , Tel Aviv, Israel",R&D,,At HoneyBook we’re re-imagining the events ind...,We are looking for a Hands-On QA Leader for ou...,Previous experience in client &amp; server tes...,,0,1,0,Full-time,Mid-Senior level,,Internet,Engineering,0
17,18,Southend-on-Sea Traineeships Under NAS 16-18 Y...,"GB, SOS, Southend-on-Sea",,,Established on the principles that full time e...,Government funding is only available for 16-18...,16-18 year olds only due to government funding...,Career prospects.,0,1,1,,,,,,0
18,19,Visual Designer,"US, NY, New York",,,Kettle is an independent digital agency based ...,Kettle is hiring a Visual Designer!Job Locatio...,,,0,1,0,,,,,,0
19,20,Process Controls Engineer - DCS PLC MS Office ...,"US, PA, USA Northeast",,,We Provide Full Time Permanent Positions for m...,Experienced Process Controls Engineer is requi...,Must have 5 or more years of experience with D...,,0,0,0,Full-time,,,,,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17875,17876,Account Director - Distribution,"CA, ON, Toronto",Sales,,Vend is looking for some awesome new talent to...,Just in case this is the first time you’ve vis...,To ace this role you:Will eat comprehensive St...,What can you expect from us?We have an open cu...,0,1,1,Full-time,Mid-Senior level,,Computer Software,Sales,0
17876,17877,Payroll Accountant,"US, PA, Philadelphia",Accounting,,WebLinc is the e-commerce platform and service...,The Payroll Accountant will focus primarily on...,- B.A. or B.S. in Accounting- Desire to have f...,Health &amp; WellnessMedical planPrescription ...,0,1,1,Full-time,Mid-Senior level,Bachelor's Degree,Internet,Accounting/Auditing,0
17877,17878,Project Cost Control Staff Engineer - Cost Con...,"US, TX, Houston",,,We Provide Full Time Permanent Positions for m...,Experienced Project Cost Control Staff Enginee...,At least 12 years professional experience.Abil...,,0,0,0,Full-time,,,,,0
17878,17879,Graphic Designer,"NG, LA, Lagos",,,,Nemsia Studios is looking for an experienced v...,1. Must be fluent in the latest versions of Co...,Competitive salary (compensation will be based...,0,0,1,Contract,Not Applicable,Professional,Graphic Design,Design,0


### concat all text data and drop not needed columns


In [None]:

df = df.withColumn(
    'text',
    concat_ws(
        ' ',
        df['title'],
        df['location'],
        df['department'],
        df['company_profile'],
        df['description'],
        df['requirements'],
        df['benefits'],
        df['employment_type'],
        df['required_education'],
        df['industry'],
        df['function']
    )
)


df = df.drop("salary_range", "job_id" ,'title','location', 'department','company_profile','description','requirements','benefits',
          'employment_type','required_education','industry', 'function' )

df.limit(5).toPandas()


Unnamed: 0,telecommuting,has_company_logo,has_questions,required_experience,fraudulent,text
0,0,1,0,Internship,0,"Marketing Intern US, NY, New York Marketing We..."
1,0,1,0,Not Applicable,0,"Customer Service - Cloud Video Production NZ, ..."
2,0,1,0,,0,"Commissioning Machinery Assistant (CMA) US, IA..."
3,0,1,0,Mid-Senior level,0,"Account Executive - Washington DC US, DC, Wash..."
4,0,1,1,Mid-Senior level,0,"Bill Review Manager US, FL, Fort Worth SpotS..."


## prepare the data 

In [None]:
#lower case
df = df.withColumn("text",lower(col('text')))

# Removing anything that is not a letter
df = df.withColumn("text",regexp_replace(col('text'), '[^A-Za-z ]+', ''))
# Remove multiple spaces
df = df.withColumn("text",regexp_replace(col('text'), ' +', ' '))
df.limit(5).toPandas()

Unnamed: 0,telecommuting,has_company_logo,has_questions,required_experience,fraudulent,text
0,0,1,0,Internship,0,marketing intern us ny new york marketing were...
1,0,1,0,Not Applicable,0,customer service cloud video production nz auc...
2,0,1,0,,0,commissioning machinery assistant cma us ia we...
3,0,1,0,Mid-Senior level,0,account executive washington dc us dc washingt...
4,0,1,1,Mid-Senior level,0,bill review manager us fl fort worth spotsourc...


In [None]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\W")
raw_words = regex_tokenizer.transform(df)
raw_words.limit(5).toPandas()

Unnamed: 0,telecommuting,has_company_logo,has_questions,required_experience,fraudulent,text,words
0,0,1,0,Internship,0,marketing intern us ny new york marketing were...,"[marketing, intern, us, ny, new, york, marketi..."
1,0,1,0,Not Applicable,0,customer service cloud video production nz auc...,"[customer, service, cloud, video, production, ..."
2,0,1,0,,0,commissioning machinery assistant cma us ia we...,"[commissioning, machinery, assistant, cma, us,..."
3,0,1,0,Mid-Senior level,0,account executive washington dc us dc washingt...,"[account, executive, washington, dc, us, dc, w..."
4,0,1,1,Mid-Senior level,0,bill review manager us fl fort worth spotsourc...,"[bill, review, manager, us, fl, fort, worth, s..."


In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
words_df = remover.transform(raw_words)
words_df=words_df.drop("words", "text")
words_df.limit(5).toPandas()

Unnamed: 0,telecommuting,has_company_logo,has_questions,required_experience,fraudulent,filtered
0,0,1,0,Internship,0,"[marketing, intern, us, ny, new, york, marketi..."
1,0,1,0,Not Applicable,0,"[customer, service, cloud, video, production, ..."
2,0,1,0,,0,"[commissioning, machinery, assistant, cma, us,..."
3,0,1,0,Mid-Senior level,0,"[account, executive, washington, dc, us, dc, w..."
4,0,1,1,Mid-Senior level,0,"[bill, review, manager, us, fl, fort, worth, s..."


In [None]:
words_df.groupBy("fraudulent").count().show()

+----------+-----+
|fraudulent|count|
+----------+-----+
|         0|17014|
|         1|  866|
+----------+-----+



In [None]:
words_df = words_df.withColumnRenamed('fraudulent', 'label')
words_df = words_df.withColumn('label', words_df['label'].cast('integer'))

words_df.limit(5).toPandas()

Unnamed: 0,telecommuting,has_company_logo,has_questions,required_experience,label,filtered
0,0,1,0,Internship,0,"[marketing, intern, us, ny, new, york, marketi..."
1,0,1,0,Not Applicable,0,"[customer, service, cloud, video, production, ..."
2,0,1,0,,0,"[commissioning, machinery, assistant, cma, us,..."
3,0,1,0,Mid-Senior level,0,"[account, executive, washington, dc, us, dc, w..."
4,0,1,1,Mid-Senior level,0,"[bill, review, manager, us, fl, fort, worth, s..."


## Converting text into vectors


In [None]:
# Hashing TF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(words_df)

In [None]:
# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

In [None]:
#rename the HTF features to features to be consistent
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

In [None]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(words_df)

W2VfeaturizedData = model.transform(words_df)
# W2VfeaturizedData.show(1,False)


In [None]:
# W2Vec Dataframes typically has negative values so we will correct for that here so that we can use the Naive Bayes classifier
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(W2VfeaturizedData)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('telecommuting','has_company_logo','has_questions','required_experience','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData' # We will need this to print later

## training and evaluation

In [None]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result

In [None]:
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

In [None]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-0.00211731, -0.03940544,  0.01474668,  0.0062809 ,  0.00962945,
              -0.055625  ,  0.02782167,  0.00298832, -0.01321965, -0.02357409,
              -0.04254043, -0.01633202, -0.0221769 ,  0.02568843, -0.06410539,
               0.05612792,  0.03031358, -0.05398289,  0.02910447,  0.03787072]])
Intercept: [-2.0642774561232766]
 
[1mOneVsRest[0m
[1mIntercept: [0m 2.273223729188219 [1m
Coefficients:[0m [0.0032115743446328097,0.008124084989141822,0.00041053261702897145,0.0016936504899131351,0.0024939468180888965,0.009363083920229828,0.0003757168091739006,0.0028577561247476214,0.003777389407365978,0.00489734269856488,0.007386027617766986,0.005308455086500712,0.005037357624835332,-0.0004429478950003195,0.00871445834769299,-0.002987542017473788,0.00013955115588197618,0.007948201569686569,-0.0015653378536501737,-0.00255