# NLP with MLlib and PySpark

## Basic theory

- audio
- text editing & translation 
- text classification

Learning computer to read using Count Vectors! Transforming words to numbers. Basic data processing steps are:

- remove stop words, punctuation, white space, special characters (in some cases we want use special characters in a different way)
- simplify text using stemming (deleting -ing and so on)
- extract features -> count vectors
- train and test
- apply the algorithm

<div class="alert alert-block alert-warning"><b>Feature Transformers</b></div>

![Loss](nlp1.png)

***Noise cleaning***

It depends, for example hastags. In some special character you will want to leave them alone. But spaces and so on, are just a redundancy.

***RegexTokenizer***

Just splitting our text to cells, each words has to have its own column.

***Stop Words***

It is a list of words I wanna exclude from my analysis. PySpark offer default list of stopwords for some languages (not Czech :().

***n-gram***

An optional splitting of my sentences, it is better to use algorithm with more n-grams.

<div class="alert alert-block alert-warning"><b>Feature Extractors</b></div>

*We have more methods for extracting features from our text.*

***Count Vectorizer***

Simple and easy, also very effective. Only lists of words and assigning numbers from 1 to 0 - if the sentence containts the word or not. Main disadvantage of Count Vectorizer is that, it's not possible to map order of word in sentences. Algorithm just know that some sentence only contains some word nothing else. 

***TF-IDF***

![Loss](nlp2.png)

The main idea is simple: it measures the importance of the word by comparing its frequency in larger datasets. For example AND is probably not so important for meaning of sentence as SUN for example.

***Word2Vec***

It is trying to understand the contextual differences between words.

***Feature Hashing***

Works by creating an indexed vocabulary, where user can specify how many values they want to store in their vocab.

## Data preparation

### Agenda
1. Quality check 
2. Clean up the data - special characters, punctuation and so on
3. Tokenize 
4. Stopwords
5. Zero index for label column
5. Create ML Pipeline
6. Vectorize
7. Train, evaluate and predict

<div class="alert alert-block alert-warning"><b>Session, dependencies and data info</b></div>

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NLP").getOrCreate()
spark

In [30]:
from pyspark.ml.feature import * #CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import * #col, udf,regexp_replace,isnull
from pyspark.sql.types import * #StringType,IntegerType
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# For pipeline development
from pyspark.ml import Pipeline 

#### Kickstarter Dataset

##### What is Kickstarter?
"Kickstarter is an American public-benefit corporation based in Brooklyn, New York, that maintains a global crowdfunding platform, focused on creativity and merchandising. The company's stated mission is to "help bring creative projects to life". Kickstarter, has reportedly received more than $1.9 billion in pledges from 9.4 million backers to fund 257,000 creative projects, such as films, music, stage shows, comics, journalism, video games, technology and food-related projects.

People who back Kickstarter projects are offered tangible rewards or experiences in exchange for their pledges. This model traces its roots to subscription model of arts patronage, where artists would go directly to their audiences to fund their work" ~ Wikipedia

So, what if you can predict if a project will be or not to be able to get the money from their backers?

#### Content

The datastet contains the blurbs or short description of 215,513 projects runned along 2017, all written in english and all labeled with "successful" or "failed", if they get the money or not, respectively. From those texts you can train linguistics models for description, and even embeddings relative to the case.

**Source:** https://www.kaggle.com/oscarvilla/kickstarter-nlp

In [6]:
#Loading our csv
path ="Datasets/"
df = spark.read.csv(path+'kickstarter.csv',inferSchema=True,header=True) 

In [7]:
df.limit(5).toPandas()

Unnamed: 0,_c0,blurb,state
0,1,"Using their own character, users go on educati...",failed
1,2,"MicroFly is a quadcopter packed with WiFi, 6 s...",successful
2,3,"A small indie press, run as a collective for a...",failed
3,4,Zylor is a new baby cosplayer! Back this kicks...,failed
4,5,Hatoful Boyfriend meet Skeletons! A comedy Dat...,failed


In [8]:
df.show(4,False)

+---+-----------------------------------------------------------------------------------------------------------------------------------+----------+
|_c0|blurb                                                                                                                              |state     |
+---+-----------------------------------------------------------------------------------------------------------------------------------+----------+
|1  |Using their own character, users go on educational quests around a virtual world leveling up subject-oriented skills (ie Physics). |failed    |
|2  |MicroFly is a quadcopter packed with WiFi, 6 sensors, and 3 processors for ultimate stability -- and fits in the palm of your hand.|successful|
|3  |A small indie press, run as a collective for authors who want to self-publish, and a sexy, smart , hilarious novel!                |failed    |
|4  |Zylor is a new baby cosplayer! Back this kickstarter to help fund new cosplay photoshoots to share hi

*I can spot some special characters, white spaces and so on.*

In [9]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- blurb: string (nullable = true)
 |-- state: string (nullable = true)



*No problem with datatypes.*

<div class="alert alert-block alert-warning"><b>Data cleaning</b></div>

In [10]:
df.count() #calculating number of rows

223627

*No lets calculate Null values - I use function from previous parts.*

In [14]:
def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(df)
spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()

+-----------+-----------------+------------------+
|Column_Name|Null_Values_Count|Null_Value_Percent|
+-----------+-----------------+------------------+
|      blurb|             1488|0.6653937136392296|
|      state|            13157| 5.883457722010312|
+-----------+-----------------+------------------+



Our state column has 5,88% of null values and blub has 0.66% of null values.

In [15]:
df.na.drop().count()

210470

*If i dropp all rows with Nan values I will drop from 223K rows to 210K, I will accept that.*

In [16]:
df = df.dropna() #Dropping
df.count() #Checking

210470

*Now I will check state column, our label column. It has to be cleaned also..*

In [17]:
df.groupBy("state").count().orderBy(col("count").desc()).show()

+--------------------+------+
|               state| count|
+--------------------+------+
|          successful|103582|
|              failed|102000|
| and get some col...|     8|
|          ","failed"|     6|
|     their childhood|     6|
|                love|     6|
| about a lonely f...|     5|
|             romance|     4|
|              poetry|     4|
|            mastered|     4|
| She Wrote"" but ...|     3|
|              2015."|     3|
|                loss|     3|
|                NY."|     3|
|               2014"|     3|
|            betrayal|     3|
|               faith|     3|
| solid surface on...|     3|
|               music|     3|
|                  CD|     3|
+--------------------+------+
only showing top 20 rows



*I have 205K of valid label values, and 5k of unvalid. I will drop all rows which does not contain successful or failed.*

In [18]:
df = df.filter("state IN('successful','failed')") #filtering and assigning

In [19]:
df.groupBy("state").count().orderBy(col("count").desc()).show() #just checking

+----------+------+
|     state| count|
+----------+------+
|successful|103582|
|    failed|102000|
+----------+------+



*Now I will proceed with special characters removing.*

In [26]:
df.select("blurb").show(4,False)

+-----------------------------------------------------------------------------------------------------------------------------------+
|blurb                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------+
|Using their own character, users go on educational quests around a virtual world leveling up subject-oriented skills  ie Physics . |
|MicroFly is a quadcopter packed with WiFi, 6 sensors, and 3 processors for ultimate stability -- and fits in the palm of your hand.|
|A small indie press, run as a collective for authors who want to self-publish, and a sexy, smart , hilarious novel!                |
|Zylor is a new baby cosplayer! Back this kickstarter to help fund new cosplay photoshoots to share his cuteness with the world!    |
+-------------------------------------------------------------

*Deleting / ) (.*

In [22]:
df = df.withColumn("blurb", translate(col("blurb"), "/", " "))\
.withColumn("blurb", translate(col("blurb"), ")", " "))\
.withColumn("blurb", translate(col("blurb"), "(", " "))

*Deleting other specical characters using Regexp.*

In [27]:
df = df.withColumn("blurb",regexp_replace(col('blurb'), '[^A-Za-z ]+', ''))
df = df.withColumn("blurb",regexp_replace(col('blurb'), ' +', ' '))
df.select("blurb").show(4,False)

+------------------------------------------------------------------------------------------------------------------------------+
|blurb                                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------------+
|Using their own character users go on educational quests around a virtual world leveling up subjectoriented skills ie Physics |
|MicroFly is a quadcopter packed with WiFi sensors and processors for ultimate stability and fits in the palm of your hand     |
|A small indie press run as a collective for authors who want to selfpublish and a sexy smart hilarious novel                  |
|Zylor is a new baby cosplayer Back this kickstarter to help fund new cosplay photoshoots to share his cuteness with the world |
+------------------------------------------------------------------------------------------------

*Now case sensitivity, we will change everything to lower.*

In [28]:
df = df.withColumn("blurb",lower(col('blurb')))
df.select("blurb").show(4,False)

+------------------------------------------------------------------------------------------------------------------------------+
|blurb                                                                                                                         |
+------------------------------------------------------------------------------------------------------------------------------+
|using their own character users go on educational quests around a virtual world leveling up subjectoriented skills ie physics |
|microfly is a quadcopter packed with wifi sensors and processors for ultimate stability and fits in the palm of your hand     |
|a small indie press run as a collective for authors who want to selfpublish and a sexy smart hilarious novel                  |
|zylor is a new baby cosplayer back this kickstarter to help fund new cosplay photoshoots to share his cuteness with the world |
+------------------------------------------------------------------------------------------------

<div class="alert alert-block alert-warning"><b>Tokenizing, StopWords and Zero Index</b></div> 

**Tokenizer**

In [31]:
regex_tokenizer = RegexTokenizer(inputCol="blurb", outputCol="words", pattern="\\W")
#our tokenizer use blurb column, new column is words, patter is just basic param W - looking for word..
raw_words = regex_tokenizer.transform(df) #transforming our data
raw_words.show(4,False)
raw_words.printSchema()

+---+------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0|blurb                                                                                                                         |state     |words                                                                                                                                               |
+---+------------------------------------------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|1  |using their own character users go on educational quests around a virtual world leveling up subjectoriented skills i

*Every row  token in this case.*

In [None]:
#|-- words: array (nullable = true)
#  |    |-- element: string (containsNull = true)

**StopWords**

I wanna delete most common words via StopWords.

In [32]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered") #seting up my remover

In [36]:
stopwords = remover.getStopWords() #filling remover with stopwords

In [37]:
stopwords[:5] #first 5 words for deleting

['i', 'me', 'my', 'myself', 'we']

In [38]:
len(stopwords) #purely out of curiosity

181

In [39]:
words_df = remover.transform(raw_words)

In [40]:
words_df.limit(4).toPandas() #look at our changes - blurb - words - filtered

Unnamed: 0,_c0,blurb,state,words,filtered
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que..."
1,2,microfly is a quadcopter packed with wifi sens...,successful,"[microfly, is, a, quadcopter, packed, with, wi...","[microfly, quadcopter, packed, wifi, sensors, ..."
2,3,a small indie press run as a collective for au...,failed,"[a, small, indie, press, run, as, a, collectiv...","[small, indie, press, run, collective, authors..."
3,4,zylor is a new baby cosplayer back this kickst...,failed,"[zylor, is, a, new, baby, cosplayer, back, thi...","[zylor, new, baby, cosplayer, back, kickstarte..."


**Label indexing**

In [43]:
indexer = StringIndexer(inputCol="state", outputCol="label") #changing our label column
feature_data = indexer.fit(words_df).transform(words_df) #indexer just replace failed and successful to numbers

feature_data.limit(4).toPandas() 

Unnamed: 0,_c0,blurb,state,words,filtered,label
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0
1,2,microfly is a quadcopter packed with wifi sens...,successful,"[microfly, is, a, quadcopter, packed, with, wi...","[microfly, quadcopter, packed, wifi, sensors, ...",0.0
2,3,a small indie press run as a collective for au...,failed,"[a, small, indie, press, run, as, a, collectiv...","[small, indie, press, run, collective, authors...",1.0
3,4,zylor is a new baby cosplayer back this kickst...,failed,"[zylor, is, a, new, baby, cosplayer, back, thi...","[zylor, new, baby, cosplayer, back, kickstarte...",1.0


*Now i can see that failed is evaluated as 1 and 0 is successful. Now all three procedures in Pipeline.*

In [44]:
regex_tokenizer = RegexTokenizer(inputCol="blurb", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol=regex_tokenizer.getOutputCol(), outputCol="filtered")
#remover is calling regexttokenizer outpit to be his input..
indexer = StringIndexer(inputCol="state", outputCol="label")

pipeline = Pipeline(stages=[regex_tokenizer,remover,indexer]) #creating a pipeline
data_prep_pl = pipeline.fit(df) #I am fitting my pipline
feature_data = data_prep_pl.transform(df) #transforming
feature_data.limit(4).toPandas()

Unnamed: 0,_c0,blurb,state,words,filtered,label
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0
1,2,microfly is a quadcopter packed with wifi sens...,successful,"[microfly, is, a, quadcopter, packed, with, wi...","[microfly, quadcopter, packed, wifi, sensors, ...",0.0
2,3,a small indie press run as a collective for au...,failed,"[a, small, indie, press, run, as, a, collectiv...","[small, indie, press, run, collective, authors...",1.0
3,4,zylor is a new baby cosplayer back this kickst...,failed,"[zylor, is, a, new, baby, cosplayer, back, thi...","[zylor, new, baby, cosplayer, back, kickstarte...",1.0


<div class="alert alert-block alert-danger"><b>BeAware:</b> main purspose of using pipelines is speed and readibility</div>

## Vectors, tests and evaluation

<div class="alert alert-block alert-warning"><b>Vectors</b></div>

**Hashing**

In [45]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=20)
HTFfeaturizedData = hashingTF.transform(feature_data)

In [47]:
HTFfeaturizedData.limit(1).toPandas() 

Unnamed: 0,_c0,blurb,state,words,filtered,label,rawfeatures
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0,"(3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 2.0, 0.0, ..."


*I can see that Hashing is just using integer number, simple vocab indexer.*

**TF-IDF**

In [48]:
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

In [50]:
TFIDFfeaturizedData.limit(1).toPandas()

Unnamed: 0,_c0,blurb,state,words,filtered,label,rawfeatures,features
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0,"(3.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 2.0, 0.0, ...","(2.252148827177929, 0.0, 0.8915391572399594, 0..."


In [55]:
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

**Word2Vec**

In [56]:
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)
W2VfeaturizedData = model.transform(feature_data)

In [58]:
W2VfeaturizedData.limit(1).toPandas()

Unnamed: 0,_c0,blurb,state,words,filtered,label,features
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0,"[-0.21108862035907805, -0.06200759852072224, 0..."


*I have negative values, lets repare that, using MinMaxScaler*

In [59]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(W2VfeaturizedData)
scaled_data = scalerModel.transform(W2VfeaturizedData)
scaled_data.limit(1).toPandas()

Unnamed: 0,_c0,blurb,state,words,filtered,label,features,scaledFeatures
0,1,using their own character users go on educatio...,failed,"[using, their, own, character, users, go, on, ...","[using, character, users, go, educational, que...",1.0,"[-0.21108862035907805, -0.06200759852072224, 0...","[0.4320722381613357, 0.5392954626524695, 0.608..."


In [65]:
W2VfeaturizedData = scaled_data.select('state','blurb','label','scaledFeatures')
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData'

**Count Vector**

In [61]:
# Count Vector (count vectorizer and hashingTF are basically the same thing)
# cv = CountVectorizer(inputCol="filtered", outputCol="features")
# model = cv.fit(feature_data)
# countVectorizer_features = model.transform(feature_data)

That's it we have three different datatypes so we can compare different results.

<div class="alert alert-block alert-warning"><b>Testing</b></div>

Again it is just Copy and Paste of previous testing functions.

In [62]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Weights"+ '\033[0m')
            print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficient Matrix"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficientMatrix))
            print("Intercept: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            print("Coefficients: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

***Classifiers***

In [67]:
classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

***Testing***

In [68]:
for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    features = featureDF.select(['features']).collect()
    # Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
    class_count = featureDF.select(countDistinct("label")).collect()
    classes = class_count[0][0]

    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    print(results.show(truncate=False))

[1mHTFfeaturizedData  Results:[0m
 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-0.0306558 ,  0.00836233, -0.08115937,  0.00139624, -0.02171133,
              -0.11053216, -0.06898924, -0.03201325, -0.02120355, -0.04344437,
               0.05290475, -0.02900662,  0.00445361, -0.0025032 , -0.02611517,
              -0.0330324 ,  0.01511948, -0.02616886, -0.0829429 , -0.03019649]])
Intercept: [0.3028264529031553]
 
[1mOneVsRest[0m
[1mIntercept: [0m -0.2853059211210915 [1m
Coefficients:[0m [0.029062550035724388,-0.008573954952669945,0.07756454991579075,-0.001858825564054125,0.02049059396015637,0.10594430189080854,0.06590565945980054,0.03048662301298438,0.020037390180429354,0.04122018280382074,-0.05125338132578102,0.02735803301400113,-0.004876058918745418,0.001901812512298275,0.024610619660272757,0.031235391956357775,-0.014927304790655724,0.024754307765211286,0.07938215827712267,0.028559168616551125]


 
[1mLogisticRegression  Coefficient Matrix[0m
You should compares these relative to eachother
Coefficients: 
DenseMatrix([[-1.46814012, -2.08760991,  1.67904552]])

Intercept: [0.8591362555621951]
 
[1mOneVsRest[0m
[1mIntercept: [0m -0.8242872605947403 [1m
Coefficients:[0m [1.4663704020370643,1.7656907897071104,-1.4457105513452693]
[1mIntercept: [0m 0.8242872605947139 [1m
Coefficients:[0m [-1.466370402036972,-1.7656907897076197,1.4457105513457007]
 
[1mLinearSVC  Coefficients[0m
You should compares these relative to eachother
Coefficients: 
[-3.7676302207664802,-0.5523437888570654,3.9370610791680343]
 
[1mRandomForestClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.5071489613734215,0.1456359944669451,0.3472150441596334])
 
[1mGBTClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.3505880539980437,0.24898888850872486,0.4004230574932315])
 
[1mDecisionTre

**Choosing best model and prediction**

In [69]:
classifier = DecisionTreeClassifier()
featureDF = W2VfeaturizedData

train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
features = featureDF.select(['features']).collect()

# Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
class_count = featureDF.select(countDistinct("label")).collect()
classes = class_count[0][0]

ClassTrainEval(classifier,features,classes,train,test)

 
[1mDecisionTreeClassifier  Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
(3,[0,1,2],[0.470484011748621,0.058488604057017966,0.4710273841943611])


DataFrame[Classifier: string, Result: string]

And finally checking the results.

In [70]:
predictions = DT_BestModel.transform(test)
print("Failures:")
predictions.select("state","blurb").filter("prediction=0").orderBy(predictions["prediction"].desc()).show(3,False)
print(" ")
print("Success:")
predictions.select("state","blurb").filter("prediction=1").orderBy(predictions["prediction"].desc()).show(3,False)

Failures:
+------+-----------------------------------------------+
|state |blurb                                          |
+------+-----------------------------------------------+
|failed| a silent baby monitor that improves your sleep|
|failed| ac                                            |
|failed| big peace of mind                             |
+------+-----------------------------------------------+
only showing top 3 rows

 
Success:
+------+-----------------------------------------------------------------------------------------------------------------------------+
|state |blurb                                                                                                                        |
+------+-----------------------------------------------------------------------------------------------------------------------------+
|failed| a pixel recreating this story through kickstarter for an even bigger and more memorable viral storyand you can be part of it|
|failed| accura

The final past is just copy and paste, there is not much to change, cause content is really high quality one.

***The End***