<img src="https://upload.wikimedia.org/wikipedia/commons/4/4f/Twitter-logo.svg" alt="twitter_logo" width="120"/>
<span style="float:left">
  <span style="font-family:Helvetica; font-size:4em;">
    <b>Twitter sentiment analysis DEV &nbsp;&nbsp;&nbsp;</b><br>
  </span>
  <span style="font-family:Helvetica; font-size:2em;">
    XGBoost
  </span>
</span>
<br clear="left"/>
<br><br><br>
<i>Goal of this project is to develop and ML pipeline for Twitter sentiment analysis with the end goal of hosting the ML pipeline in and ML Ops Pipeline using streaming Twitter data through the AWS architecture.</i>
<br><br>
  In this notebook I set up a pipeline to featurize my tweet data and train and assess multiple XGBoost models

## Table of contents
#### 1. Set-up
* 1.1 Environment
  * 1.1.1 Install pacages
  * 1.1.2 Import packages
* 1.2 User defined functions
* 1.3 Global variables
* 1.4 Import data

#### 2. Pre-processing
* 2.1 Transform label column
* 2.2 Train-test split
* 2.3 Pre-processing pipeline
  * 2.3.1 SparkKLP pipeline
  * 2.3.2 Vectorizer pipeline
  * 2.3.3 Combine pipeline
* 2.4 Trainsform train and and test features

#### 3. XGBoost
* 3.1 Baseline model
  * 3.1.1 Build baseline model
  * 3.1.2 Fit baseline model
  * 3.1.3 Predict
  * 3.2 Evaluate
* 3.2 Hyperparam tuning
  * 3.2.1 Set up search space and CV strategy
* 3.3 Run the training with MLFlow
<br><br>

## 1. Set up

#### 1.1 Environment

##### 1.1.1 Install packages

<b>In Databricks</b><br>
Packages installed via the compute interface<br>
Copy/paste this in the Configuration>Spark tab when first setting up the cluster:<br>
<br>
spark.serializer org.apache.spark.serializer.KryoSerializer<br>
spark.databricks.delta.preview.enabled true<br>
spark.kryoserializer.buffer.max 2000M<br>
<br>
Then, navgiate to the libraries tab: <br>
Install new > Pypi: copy paste this under package: spark-nlp==3.4.4<br>
Install new > Maven: copy paste: com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.4<br>

##### 1.1.2 Import packages

In [None]:
# Data wrangling
from pyspark.sql.types import StringType
import pyspark.sql.functions as F


# SparkNLP pre-processing
import sparknlp
from sparknlp.base import DocumentAssembler
from sparknlp.base import Finisher
from sparknlp.annotator import Tokenizer
from sparknlp.annotator import Normalizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.annotator import StopWordsCleaner
from sparknlp.annotator import NGramGenerator
from sparknlp.annotator import PerceptronModel


# nltk for stopwords
import nltk
from nltk.corpus import stopwords


# Pre-processing and validation
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, HashingTF, IDF, StringIndexer, CountVectorizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


# mlflow for tuning and logging
import mlflow
import mlflow.spark

# XGBoost
from sparkdl.xgboost import XgboostClassifier

import os



#### 1.2 UDFs

In [None]:
# No UDFs

#### 1.3 Global variables

In [None]:
# CREATE directory for data import
MOUNTED_DATA_PATH = '/tmp/delta/twitter_stream'

# INITIALIZE spark nlp instance
spark = sparknlp.start(spark32=True)

# TROUBLESHOOTING environment
# os.environ['PYSPARK_PIN_THREAD'] = 'true'
# spark.conf.set('spark.databricks.clusterUsageTags.clusterPinned', 'false')

#### 1.4 Import data

In [None]:
# CREATE list: columns to drop
droppers = ['ts', 'tweet_id', 'author_id', 'text', 'country_code', 'location', 'score']

# INTIALIZE spark dataframe & DROP columns
sdf = ((spark.read
             .format('delta')
             .load(MOUNTED_DATA_PATH))
                 .drop(*droppers)
      )

## 2. Pre-processing

#### 2.1 Transform label column

In [None]:
# REPLACE sentiment labels: Negative->0, Neutral->1, Positive->2
sdf = sdf.withColumn('label', F.when(F.col('sentiment') == 'Positive', 2)
                                .otherwise(F.when(F.col('sentiment') == 'Neutral', 1)
                                            .otherwise(0))) \
                                .drop('sentiment')

#### 2.2 Train-test split

In [None]:
# CREATE train & test datasets with 80/20 split
sdf_train, sdf_test = sdf.randomSplit([0.8, 0.2], seed=42069)

#### 2.3 Pre-processing pipeline

##### 2.3.1 SparkNLP pipeline

In [None]:
# CONVERT tweet text to spark NLP format
documentAssembler = DocumentAssembler() \
     .setInputCol('translated_text') \
     .setOutputCol('document')


# TOKENIZE tweet text
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

# CLEAN the data with normalizer
# CREATE patterns to remove
patterns = ['http', '@\S+', '#', '[^a-zA-Z]', '\s+']

# CLEAN
normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True) \
     .setCleanupPatterns(patterns)

# LEMMATIZE cleaned tokens
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemmatized')

# REMOVE stopwords
# IMPORT stopwords
nltk.download('stopwords')

# CREATE list of stopwords
eng_stopwords = stopwords.words('english')

# REMOVE stopwords
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

# CREATE ngrams (1-, 2-, & 3- grams) from lemmatized tokens
ngrammer = NGramGenerator() \
    .setInputCols(['unigrams']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

# CONVERT back to string
finisher = Finisher() \
     .setInputCols(['ngrams'])



# CREATE pipeline
sparknlp_pipeline = Pipeline().setStages([documentAssembler, tokenizer, normalizer, lemmatizer, stopwords_cleaner, ngrammer, finisher])

# Fit to train data
# fitted_sparknlp_pipeline = sparknlp_pipeline.fit(sdf_train)

##### 2.3.2 Vectorizer pipeline

In [None]:
# CREATE column names
string_cols = ['lang', 'tag']
string_cols_map = [(col, col+'_ix') for col in string_cols]
predictors = [col[1] for col in string_cols_map] + ['ngram_idf']


# VECTORIZE
cv = CountVectorizer(inputCol='finished_ngrams', outputCol='ngram_cv')

# INVERSE DENSE FREQUENCY
idf = IDF(inputCol='ngram_cv', outputCol='ngram_idf', minDocFreq=8) # minDocFreq: remove sparse terms

# INDEX non-text string cols
si = [StringIndexer(inputCol = col[0], outputCol = col[1]) for col in string_cols_map]

# COMBINE language index, tag index, and idf-transformed word vectors
va = VectorAssembler(inputCols = [*predictors] , outputCol='features')

# CREATE pipeline
featurizer_pipeline = Pipeline(stages = [cv, idf, *si, va])


##### 2.3.3 Combine pipelines, fit, transform

In [None]:
# CREATE pipeline of pipelines: sparnlp pipeline and featurizer pipelines
pipeline = Pipeline(stages = [sparknlp_pipeline, featurizer_pipeline])

# FIT pipeline to train data
fitted_pipeline = pipeline.fit(sdf_train)

#### 2.4 Transform train and test features

In [None]:
# TRANSFORM train data with pipeline
sdf_train_prepared = fitted_pipeline.transform(sdf_train)

# CACHE
sdf_train_prepared.cache()
sdf_train_prepared.count()


In [None]:
# TRANSFORM test data with pipeline
sdf_test_prepared = fitted_pipeline.transform(sdf_test)

# CACHE 
sdf_test_prepared.cache()
sdf_test_prepared.count()

## 3. XGBoost

#### 3.1 Baseline model

##### 3.1.1 Build baseline model

In [None]:
# # SETUP baseline parameters
# xgbParams = dict(
#     learning_rate = 0.1,
#     max_depth = 2,
#     missing=0.0,
#     n_estimators = 5,
#     objective="multi:softmax",
#     num_workers=8,               # Set this to less than or equal to number of workers in cluester for distributed training
#     featuresCol='features',
#     labelCol='label',
#     nthread = 36
#         )

# # # INITIALIZE XGBoost class object with baseline parameters
# xgb = XgboostClassifier(**xgbParams)

##### 3.1.2 Fit baseline model

In [None]:
# FIT baseline XGBoost to train
# xgb_baseline = xgb.fit(sdf_train_prepared)

##### 3.1.3 Predict

In [None]:
# TRANSFORM: predict test labels
# predictions = xgb_baseline.transform(sdf_test_prepared)

##### 3.1.4 Evaluate

In [None]:
# EVALUATE predictions: f1 score
# evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', metricName='f1')
# evaluator.evaluate(predictions)

# Output: 0.6761656533718952 
# Output2: 0.33767296156355153 Mstill better than 0.01 with default params

In [None]:
# display(predictions)

#### 3.2 Hypterparam tuning

##### 3.2.1 Set up search space params and cross validation method

In [None]:
# INITIALIZE parameters dictionary
xgbParams = dict(
    missing=0.0,
    objective="multi:softmax",
    num_workers=1,               # Set this to 1. Parallelization is done in grid search
    featuresCol='features',
    labelCol='label'
)

# INITIALIZE XGBoost
xgb = XgboostClassifier(**xgbParams)


# CREATE XGBoost parameters to gridsearch
learning_rate = [1.0, 0.5, 0.1]
max_depth = [9, 12]
min_child_weight = [1, 0.5, 0]
subsample = [0.2, 0.4, 0.6,]
n_estimators = [50, 75, 100]




# INITIALIZE grid object
param_grid = (
  ParamGridBuilder()
    .addGrid(xgb.max_depth, max_depth)
    .addGrid(xgb.n_estimators, n_estimators)
    .addGrid(xgb.subsample, subsample)
    .addGrid(xgb.min_child_weight, min_child_weight)
    .addGrid(xgb.learning_rate, learning_rate)
    .build()
)


# CREATE evaluators
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol='prediction', metricName='f1')
evaluator_probability = MulticlassClassificationEvaluator(predictionCol='prediction', metricName='probability')
evaluator_logloss = MulticlassClassificationEvaluator(predictionCol='prediction', metricName='logLoss')
evaluator_recall = MulticlassClassificationEvaluator(predictionCol='prediction', metricName='weightedRecall')



# CREATE cross validation object
cv = CrossValidator(
    parallelism=36,    # NOTE. Set this parameter to the number of workers in your Databricks enviornment
    estimator=xgb,
    estimatorParamMaps=param_grid,
    evaluator=evaluator_f1,
    numFolds=5,
    seed=42069,
    
)

In [None]:
# sdf_train_prepared.cache()
# sdf_train_prepared.count()

#### 3.3 Run the training with mlflow

In [None]:
# RUN grid search with MLFlow
with mlflow.start_run(run_name = 'XGBoost_sentiment_3'):
    model = cv.fit(sdf_train_prepared) # This generates a bunch of models and scores them
    
    # GET BEST MODEL PARAMS
    messy_param_dict = model.bestModel.extractParamMap()
    best_params = {}
    
    # CREATE clean dictionary with model params
    for param, value in messy_param_dict.items():
        best_params[param.name] = value
    
    # SAVE params
    mlflow.log_params(best_params)  # save the parameters to logs
    
    # SAVE best model
    mlflow.spark.log_model(model.bestModel, 'XGBoost_best_model')
    
    # SAVE best model scores
    metrics = dict(f1 = evaluator_f1.evaluate(model.bestModel.transform(sdf_test_prepared)),
                   logloss = evaluator_logloss.evaluate(model.bestModel.transform(sdf_test_prepared)),
                   recall = evaluator_recall.evaluate(model.bestModel.transform(sdf_test_prepared))
                    )
    # LOG metrics
    mlflow.log_metrics(metrics)