## **Mount drive**

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

# **1. Description of the project**

The goal of our project is a *Binary Classification* that is able to predict the polarity of an Amazon Review. <br>We found a dataset available on Kaggle that is composed of three columns:

*   ***Classification***: polarity of the review (2 for Positive, 1 for Negative)
*   ***Title*** of the review
*   ***Description*** of the review

The Dataset is composed of about 3,6M of record and it is balanced (Positive reviews are more or less the same number).
We try to do our best using Colab Platform that is resource-limited.

# **2. Setting up the Environment**

## **Install Libraries**

In [None]:
!pip install pyspark



## **Import Libraries**

In [None]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC

## **Define utility functions**
In this section, we introduce two functions that we will use during this Colab.
One is for Text Cleaning and the other is for OneHotEncoding during Data Preparation.


In [None]:
def clean_text(df, column_name):
    """ 
    This function takes the raw text data and applies a standard NLP preprocessing pipeline consisting of the following steps:
      - Text cleaning
      - Tokenization
      - Stopwords removal
      - Stemming (Snowball stemmer)

    parameter: dataframe
    returns: the input dataframe along with the `cleaned_content` column as the results of the NLP preprocessing pipeline

    """
    from pyspark.sql.functions import udf, col, lower, trim, regexp_replace
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer # BE SURE NLTK IS INSTALLED ON THE CLUSTER USING THE "LIBRARIES" TAB IN THE MENU

    # Text preprocessing pipeline
    print("***** Text Preprocessing Pipeline *****\n")

    # 1. Text cleaning
    print("# 1. Text Cleaning\n")
    # 1.a Case normalization
    print("1.a Case normalization:")
    lower_case_news_df = df.select(lower(col(column_name)).alias(column_name))
    #lower_case_news_df.show(10)
    # 1.b Trimming
    print("1.b Trimming:")
    trimmed_news_df = lower_case_news_df.select(trim(col(column_name)).alias(column_name))
    #trimmed_news_df.show(10)
    # 1.c Filter out punctuation symbols
    print("1.c Filter out punctuation:")
    no_punct_news_df = trimmed_news_df.select((regexp_replace(col(column_name), "[^a-zA-Z\\s]", "")).alias(column_name))
    #no_punct_news_df.show(10)
    # 1.d Filter out any internal extra whitespace
    print("1.d Filter out extra whitespaces:")
    cleaned_news_df = no_punct_news_df.select(trim(regexp_replace(col(column_name), " +", " ")).alias(column_name))
    #cleaned_news_df.show(10)

    # 2. Tokenization (split text into tokens)
    print("# 2. Tokenization:")
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    tokens_df = tokenizer.transform(cleaned_news_df).cache()
    #tokens_df.show(10)

    # 3. Stopwords removal
    print("# 3. Stopwords removal:")
    stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="terms")
    terms_df = stopwords_remover.transform(tokens_df).cache()
    #terms_df.show(10)

    # 4. Stemming (Snowball stemmer)
    print("# 4. Stemming:")
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms")).cache()
    #terms_stemmed_df.show(10)
    
    return terms_stemmed_df


In [None]:
# This function is responsible to implement the pipeline above for transforming categorical features into numerical ones
def to_numerical(df, numerical_features, categorical_features, target_variable):

    """
    Args:
        - df: the input dataframe
        - numerical_features: the list of column names in `df` corresponding to numerical features
        - categorical_features: the list of column names in `df` corresponding to categorical features
        - target_variable: the column name in `df` corresponding to the target variable

    Return:
        - transformer: the pipeline of transformation fit to `df` (for future usage)
        - df_transformed: the dataframe transformed according to the pipeline
    """
    
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler


    # 1. Create a list of indexers, i.e., one for each categorical feature
    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep") for c in categorical_features]
    
    

    # 2. Create the one-hot encoder for the list of features just indexed (this encoder will keep any unseen label in the future)
    encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers], 
                                    outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers], 
                                    handleInvalid="keep")

    # 3. Indexing the target column (i.e., transform it into 0/1) and rename it as "label"
    # Note that by default StringIndexer will assign the value `0` to the most frequent label, which in the case of `deposit` is `no`
    # As such, this nicely resembles the idea of having `deposit = 0` if no deposit is subscribed, or `deposit = 1` otherwise.
    label_indexer = StringIndexer(inputCol = target_variable, outputCol = "label")
    
    # 4. Assemble all the features (both one-hot-encoded categorical and numerical) into a single vector
    assembler = VectorAssembler(inputCols=encoder.getOutputCols() + numerical_features, outputCol="features")

    # 5. Populate the stages of the pipeline
    stages = indexers + [encoder] + [label_indexer] + [assembler]

    # 6. Setup the pipeline with the stages above
    pipeline = Pipeline(stages=stages)

    # 7. Transform the input dataframe accordingly
    transformer = pipeline.fit(df)
    df_transformed = transformer.transform(df)

    # 8. Eventually, return both the transformed dataframe and the transformer object for future transformations
    return transformer, df_transformed 

### **Check null rows**
For our purpose, there must not be null values on any of the column. So we check that there aren't any null values in our dataset:

In [None]:
df.toPandas().isnull().sum()


Unfortunately, there are some rows with null values. We drop them in order to not have errors during the execution of some functions: 


In [None]:
df = df.na.drop()

At one point of this Colab, we will use OneHotEncoding to transform String values in Numeric values to use them in our models. <br>OneHotEncoding needs values different from Blank String (e.g. ""), so we filter rows that haven't Blank String in any column.

In [None]:
df = df.filter(df['title'] != "")
df = df.filter(df['description'] != "")

# **3. Dataset Analysis**
The Dataset is available at this link:

https://www.kaggle.com/kritanjalijain/amazon-reviews

The first thing that we have to do is to load the dataset.
We create a variable called *nrows* to indicate how many rows of the dataset we want to process.
Due to the limited environment, we can load at most more or less 50.000 record at one time (if we try to use more than 50.000 elements, some models will fail returning "Java heap space").
If you want to process more than of 50.000, we suggest to use only one model at time (and, anyway, not more than 100.000 elements at time).


In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.kryoserializer.buffer", "64m") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .appName('Amazon') \
    .getOrCreate() \
    

KeyboardInterrupt: ignored

## **Read dataset file into a Spark Dataframe**

In [None]:
df = spark.read.load("/content/drive/MyDrive/MAGISTRALE/BigData/Project/Amazon/train.csv", 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="false"
                         )

In [None]:
df = df.sample(withReplacement=False, fraction=0.014)

In [None]:
df.toPandas().to_csv('/content/drive/MyDrive/MAGISTRALE/BigData/Project/Amazon/random50k.csv')

## **Check the shape of the loaded dataset (Number of rows and column)**

In [None]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(df.count(), len(df.columns)))

## **Rename of the columns**
The dataset has no Column Name, so the very first thing that we have to do, after loading the dataset, is to give to our column a name.
By default, if there are no Column Name available in the imported Dataset, PySpark creates a dataframe with Column Names called <br><br>**_c{index}**<br><br>
To make it more Human Readable, we rename the columns as follow:<br>

*   _c0 -> **classification**
*   _c1 -> **title**
*   _c2 -> **description**



In [None]:
df = df.withColumnRenamed("_c0", "classification").withColumnRenamed("_c1", "title").withColumnRenamed("_c2", "description")

## **Check if the dataset is balanced**
One important characteristic is that the want our Dataset to be balanced, so we want to have more or less half of examples classified as **Positive** and half classified as **Negative**

In [None]:
df.groupBy("classification").count().show()

So, the Dataset is balaced and we can proceed doing our analysis.

## **Print out the schema of the loaded dataset**
Just to give more information, our dataset has this schema

In [None]:
df.printSchema()

We remind that our models works with numerical value, so we have to distinguish which columns is not a number and process it in order to make it a numeric.

In [None]:
# Let's define some constants which we will use throughout this notebook
NUMERICAL_FEATURES = []
CATEGORICAL_FEATURES = ['title', 'description']
TARGET_VARIABLE = "classification"

## **Text Cleaning**
We now use the function defined before to clean text and delete all not useful informations.

In [None]:
print("Cleaning column title")
title_cleaned_df = clean_text(df, "title")

In [None]:
print("Cleaning column description")
description_cleaned_df = clean_text(df, "description")

## **Correlating cleaned values with input rows**
After cleaning rows, we need to join the original rows with the cleaned ones.<br>
In order to do so, we added a column "id" to identify rows (on each Dataframe) and then join the Dataframes using this column. 

In [None]:
#aggiungiamo gli id delle righe per fare i join
clean_title_indexed = title_cleaned_df.select("*").withColumn("id", monotonically_increasing_id())
clean_description_indexed = description_cleaned_df.select("*").withColumn("id", monotonically_increasing_id())
initial_indexed = df.select("*").withColumn("id", monotonically_increasing_id())

In [None]:
#rinomino le colonne del dataframe pulito, così quando effettuiamo il join capiamo quali colonne sono utili
clean_title_indexed = clean_title_indexed \
                                        .withColumnRenamed("title", "_title_clean") \
                                        .withColumnRenamed("terms", "_title_terms") \
                                        .withColumnRenamed("tokens", "_title_tokens") \
                                        .withColumnRenamed("terms_stemmed", "_title_terms_stemmed")


In [None]:
#rinomino le colonne del dataframe pulito, così quando effettuiamo il join capiamo quali colonne sono utili
clean_description_indexed = clean_description_indexed \
                                                    .withColumnRenamed("description", "_description_clean") \
                                                    .withColumnRenamed("terms", "_description_terms") \
                                                    .withColumnRenamed("tokens", "_description_tokens") \
                                                    .withColumnRenamed("terms_stemmed", "_description_terms_stemmed")

In [None]:
#Join di tutti e 3 i dataframe
mid_join_df = clean_title_indexed.join(clean_description_indexed, "id")
initial_indexed_only_target = initial_indexed.select(initial_indexed['id'], initial_indexed['classification'])
#Ultimo join con la tabella iniziale con la colonna target
final_df = mid_join_df.join(initial_indexed_only_target, "id")



## **Feature Selection**
At this point, we have a Dataframe composed in this way:

In [None]:
final_df.show(1)

We define two variables to use during the training phase.<br>Changing them will change the result of the training

In [None]:
feature_1 = "_title_terms"
feature_2 = "_description_terms"

In [None]:
final_df = final_df.select(final_df["classification"],final_df[feature_1],final_df[feature_2])
final_df = final_df \
                  .withColumnRenamed("classification", "classification") \
                  .withColumnRenamed(feature_1, "title") \
                  .withColumnRenamed(feature_2, "description")

In [None]:
final_df = final_df \
                  .withColumn("title", concat_ws(" ", "title")) \
                  .withColumn("description", concat_ws(" ", "description"))

In [None]:
final_df = final_df.filter(final_df['title'] != "")
final_df = final_df.filter(final_df['description'] != "")

## **Check that everything is OK**

In [None]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(final_df.count(), len(final_df.columns)))

In [None]:
print("Showing first 5 rows of final dataframe")
final_df.show(5)

# **4. Data Exploration**

In [None]:
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator

In [None]:
most_used_words_title = " ".join(review for review in final_df.toPandas()['title'])

In [None]:
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_title)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

In [None]:
most_used_words_description = " ".join(review for review in final_df.toPandas()['description'])
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_description)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

In [None]:
most_used_words_title_positive = " ".join(review for review in final_df.filter(final_df['classification'] == 2).toPandas()['title'])
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in POSITIVE title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_title_positive)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

In [None]:
most_used_words_title_negative = " ".join(review for review in final_df.filter(final_df['classification'] == 1).toPandas()['title'])
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in NEGATIVE title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_title_negative)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

In [None]:
most_used_words_description_positive = " ".join(review for review in final_df.filter(final_df['classification'] == 2).toPandas()['description'])
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in POSITIVE title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_description_positive)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

In [None]:
most_used_words_description_negative = " ".join(review for review in final_df.filter(final_df['classification'] == 1).toPandas()['description'])
# Start with one review:
#text = final_df.toPandas()['title']
print("Showing most frequent words in NEGATIVE title")
# Create and generate a word cloud image:
wordcloud = WordCloud(width = 1000, height = 500).generate(most_used_words_description_negative)
plt.figure(figsize=(15,8))
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

The Learning Pipeline**

### **Balanced vs. Unbalanced Dataset**

So far, we haven't looked at how the binary target variable `deposit` is distributed across the instances of our dataset. In this "lucky" example, we know that _positive_ examples (i.e., instances where `deposit = 1`) and _negative_ examples (i.e., instances where `deposit = 0`) are somehow balanced (i.e., around 50% of the instances are positives and the other 50% are negatives). That is due to the way this sample dataset has been extracted from the original one.

Most often, though, we have to deal with (very) unbalanced datasets where the minority class (which is usually the one we are interested in!) is accounting only for a small fraction of the total number of training instances. For example, consider the click-through rate (CTR) prediction problem, where we want to foresee whether an advertisement (or, in general, a web page) will be clicked by a user. There, most of the advertisements will not be clicked (negatives), whilst only a tiny fraction (even smaller than 1%) of them will be.

The fact that a dataset is balanced (respectively, unbalanced) affects the process which we should use to correctly splitting it into _training_ and _test_ set. In particular:

- If the dataset is (almost) balanced, we can safely use a **simple random sampling** strategy, which assigns to every instance the same probability of being selected (i.e., if there are _m_ instances, each one will be picked with the same uniform probability _p = 1/m_);
- If the dataset is (very) unbalanced, simple random sampling might lead to a poor splitting strategy, where - for instance - the test set ends up containing only examples that are labeled with the most representative class. To overcome such an issue, **stratified random sampling** is the right choice to take as it guarantees that both the training and the test split follow the same class distribution observed in the original dataset (e.g., if the dataset contains 99% of negative instances and 1% of positive ones, so will the training and the test set). This works by first "stratifying" the data according to the two groups (i.e., positives vs. negatives), and within each group apply simple random sampling. For example, if our original dataset contains _m_ instances so that _m_ = (_m+_) + (_m-_) and _m+ << _m- (e.g., _m+_/_m_ = 0.01) and we want to sample _k_ < _m_ instances out of the dataset, we will first stratify the original dataset and will select _k+_ = _km+_/_m_ positive instances and _k-_ = _km-_/_m_ negative instances, respectively.

### Let's first verify our dataset is actually _balanced_

In [None]:
final_df.groupBy(TARGET_VARIABLE).count().show()

# **5. Data Preparation**

### **Dataset Splitting: Training vs. Test Set**

Before moving along with any preprocessing involving data transformations, we will split our dataset into **2** portions:
- _training set_ (e.g., accounting for **80%** of the total number of instances);
- _test set_ (e.g., accounting for the remaining **20%** of instances)

In [None]:
RANDOM_SEED = 3

In [None]:
# Randomly split our original dataset `house_df` into 80÷20 for training and test, respectively
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=RANDOM_SEED)

In [None]:
print("Training set size: {:d} instances".format(train_df.count()))
print("Test set size: {:d} instances".format(test_df.count()))

### **Transform Categorical features into Numerical using One-Hot Encoding**

Note that this step is not always mandatory (e.g., decision trees are able to work nicely with categorical features without the need of transforming them to numerical). Still, other methods (like logistic regression) are designed to operate with numerical inputs only.

To transform _categorical_ features into _numerical_ ones we proceed as follows.
We setup a pipeline which is composed of the following steps:
- [`StringIndexer`](https://spark.apache.org/docs/latest/ml-features#stringindexer): encodes a string column of labels to a column of label indices. The indices are in `[0, numLabels)`, and 4 ordering options are supported (default `frequencyDesc`, which assigns the most frequent label the index `0`, and so on and so forth).
- [`OneHotEncoderEstimator`](https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator): maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. An important parameter is `handleInvalid`, which indicates how to deal with previously unseen labels. By default this raises an error but it can be set to as `keep` to assign previously unseen labels a fallback value.
- [`VectorAssembler`](https://spark.apache.org/docs/latest/ml-features#vectorassembler): is a transformer that combines a given list of columns into a single vector column.

In [None]:
 # Transform the training set and get back both the transformer and the new dataset
oh_transformer, oh_train_df = to_numerical(train_df, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)

In [None]:
# Show the result of numerical transformation
#oh_train_df.show(5)

In [None]:
# Select `features` and `label` (i.e., formerly `deposit`) target variable only
train = oh_train_df.select(["features", "label"])

## **Use the One-Hot encoding pipeline to transform the Test Set**

In [None]:
#train.show(5, truncate=False)

In [None]:
# Here, we use the same transformer as the one returned by the `to_numerical` function above yet applied to the test set
oh_test_df = oh_transformer.transform(test_df)

In [None]:
# Select `features` and `label` only
test = oh_test_df.select(["features", "label"])
#test.show(5)

# **6. Training and Evaluation of Models**


*   **Logistic Regression**
*   **Naive Bayes Classifier**
*   **Support Vector Machine (SVM)**
*   **Gradient Boosted Tree Classifier(GBTC)**




## **Logistic Regression**

In particular, we can specify the following parameters:

- `regParam` is the regularization parameter (or $\lambda$);
- `elasticNetParam` is the tradeoff parameter for regularization penalties (or $\alpha$);
  - `regParam = 0` and `elasticNetParam = 0` means there is no regularization;
  - `regParam > 0` and `elasticNetParam = 0` means there is only L2-regularization; 
  - `regParam > 0` and `elasticNetParam = 1` means there is only L1-regularization;
  - `regParam > 0` and `0 < elasticNetParam < 1` means there is both L1- and L2-regularization (Elastic Net);



### **Build and Train Logistic Regression Model** 

In [None]:
from pyspark.ml.classification import LogisticRegression # This corresponds to LogisticRegressionWithLBFGS

# This setting corresponds to no regularization at all (i.e., both regParam=0 and elasticNetParam=0)
log_reg = LogisticRegression(featuresCol = "features", labelCol = "label", maxIter=100, regParam=1, elasticNetParam=0.5)
log_reg_model = log_reg.fit(train)

### **Evaluate Performance on Training Data**

In [None]:
lr_summary=log_reg_model.summary

In [None]:
lr_summary.accuracy

In [None]:
lr_summary.areaUnderROC

In [None]:
#lr_summary.weightedRecall

In [None]:
#r_summary.weightedPrecision

### **Compute predictions on the Test Set according to the model learned on the Training Set**

In [None]:
# `log_reg_model` is a Transformer which can be used to "transform" our test set
predictions = log_reg_model.transform(test)

### **Evaluate model performance on the Test Set**

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(metricName= 'areaUnderROC')
lr_auroc = evaluator.evaluate(predictions)
print('Test Set AUC: {:.3f}'.format(lr_auroc))

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print ('Model Accuracy:{:.3f}'.format(accuracy))

## **Naive Bayes Classifier**

### **Build and Train Naive Bayes Classifier Model**

In [None]:
nb = NaiveBayes(featuresCol = "features", labelCol = "label", predictionCol= 'prediction')
nb_model = nb.fit(test)

In [None]:
model_predictions = nb_model.transform(test)
#model_predictions.select(['label','probability', 'prediction']).show(10,False)

### **Evaluate Performance on Test Data**

In [None]:
nb_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
nb_auroc = nb_evaluator.evaluate(model_predictions)
print('The auc value of NB Classifier is {:.3f}'.format(nb_auroc))

In [None]:
nb_evaluator = BinaryClassificationEvaluator()

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model_predictions)
print ('Model Accuracy:{:.3f}'.format(accuracy))

## **Support Vector Machine(SVM)**

### **Build and Train SVM Model**

In [None]:
lsvc = LinearSVC(featuresCol = "features", labelCol = "label", maxIter=100, predictionCol= 'prediction')
lsvc_model = lsvc.fit(train)

### **Evaluate Performance on Test Data**

In [None]:
model_predictions = lsvc_model.transform(test)

In [None]:
svc_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
svc_auroc = svc_evaluator.evaluate(model_predictions)
print('The auc value of SupportVectorClassifier is {:.3f}'.format(svc_auroc))

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model_predictions)
print ('Model Accuracy:{:.3f}'.format(accuracy))

## **Gradient Boosted Tree Classifier (GBTC)**

In [None]:
gbt = GBTClassifier()
gbt_model = gbt.fit(train)
model_predictions = gbt_model.transform(test)

In [None]:
gbt_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
gbt_auroc = gbt_evaluator.evaluate(model_predictions)
print('The auc value of GradientBoostedTreesClassifier is {:.3f}'.format(gbt_auroc))

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model_predictions)
print ('Model Accuracy:{:.3f}'.format(accuracy))

### Random Forest libro

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf = RandomForestClassifier(numTrees=50,maxDepth=30)
rf_model = rf.fit(train)

In [None]:
 model_predictions=rf_model.transform(test)

In [None]:
rf_evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
rf_auroc = rf_evaluator.evaluate(model_predictions)
print(f'The auc value of RandomForestClassifier Model is {rf_auroc}')

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="confu")
accuracy = evaluator.evaluate(model_predictions)
print ('Model Accuracy:{:.3f}'.format(accuracy))