# Multilayer perceptron classifier for news sentiment

The financial news sentiment dataset has about 4846 records of financial news and corresponinding sentiment. The sentiments include 'positive', 'neutral' and 'negative'. 
The below code will load, precess data and use the mulilayer perceptron .

### Create a spark session and load the Financial News Data set

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('FinancialNewsDL').getOrCreate()

In [0]:
#spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

file_location = "/FileStore/finalproject/FinancialNewsSentiment.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "false"
delimiter = ","

df_news = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)\

#end loading

In [0]:
df_news = df_news.withColumnRenamed("_c0","label")
df_news = df_news.withColumnRenamed("_c1","news")

### Data processing

In [0]:
df_news = df_news.dropna()

In [0]:
# Create a 70-30 train test split

train_data,test_data=df_news.randomSplit([0.7,0.3])

### Building the MultiLayerPerceptron model

In [0]:
import pyspark.sql.functions as f

In [0]:
# Index label column as it is category variable - positive, negative, neutral
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol="label",outputCol="indexedLabel")
#labelIndexer = labelIndexer.fit(df_news)

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="news", outputCol="words")

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures= 4000)

idf = IDF(inputCol="rawFeatures", outputCol="features")

In [0]:
# Import the required libraries
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [0]:
# Create an object for the MultilayerPerception model

layers = [4000,128,64,3]
mlpModel = MultilayerPerceptronClassifier(maxIter=50, layers= layers, seed=1984, blockSize=128, featuresCol="features", labelCol= "indexedLabel")

In [0]:
# Pipeline is used to pass the data through indexer, tokenizer, TF-IDF simultaneously. Also, it helps to pre-rocess the test data in the same way as that of the train data. It also 
from pyspark.ml import Pipeline

pipe = Pipeline(stages=[labelIndexer,tokenizer,hashingTF,idf,mlpModel])


In [0]:
fit_model=pipe.fit(train_data)

In [0]:
# Store the results in a dataframe

result = fit_model.transform(test_data)

In [0]:
result.select("indexedLabel","prediction").show()

+------------+----------+
|indexedLabel|prediction|
+------------+----------+
|         2.0|       2.0|
|         2.0|       2.0|
|         2.0|       0.0|
|         2.0|       2.0|
|         2.0|       2.0|
|         2.0|       1.0|
|         2.0|       0.0|
|         2.0|       2.0|
|         2.0|       2.0|
|         2.0|       1.0|
|         2.0|       0.0|
|         2.0|       1.0|
|         2.0|       0.0|
|         2.0|       2.0|
|         2.0|       2.0|
|         2.0|       0.0|
|         2.0|       1.0|
|         2.0|       2.0|
|         2.0|       2.0|
|         2.0|       0.0|
+------------+----------+
only showing top 20 rows



### Evaluating the model

##### Accuracy

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
pred_and_actual = result.select("prediction","indexedLabel").withColumnRenamed("indexedLabel","label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("The accuracy of the model is {}".format(evaluator.evaluate(pred_and_actual)))
#print(str(evaluator.evaluate(pred_and_actual)))

The accuracy of the model is 0.713487071977638


##### Confusion Matrix

In [0]:
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
#important: need to cast to float type, and order by prediction, else it won't work
pred_and_actual = result.select(['prediction','indexedLabel']).withColumn('label', f.col('indexedLabel').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
pred_and_actual = pred_and_actual.select(['prediction','label'])

metrics = MulticlassMetrics(pred_and_actual.rdd.map(tuple))

#print(metrics.confusionMatrix().toArray())
print("Below is the confusion matrix \n {}".format(metrics.confusionMatrix().toArray()))

Below is the confusion matrix 
 [[688. 134.  33.]
 [143. 228.  27.]
 [ 43.  30. 105.]]


#####  Area under the ROC

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='indexedLabel',metricName='areaUnderROC')
AUC = AUC_evaluator.evaluate(result)
print("The area under the curve is {}".format(AUC))

The area under the curve is 0.7501949317738792


A roughly 75% area under ROC denotes the model has performed reasonably well in predicting the sentiment of financial news

#####  Area under the PR

In [0]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='indexedLabel',metricName='areaUnderPR')
PR = PR_evaluator.evaluate(result)
print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.697350379579132


### Save Model

In [0]:
#set / load basePath
basePath = "/FileStore/finalproject"

##### Save the pipeline

In [0]:
pipe.write().overwrite().save(basePath + "/pipeline")

##### Save trained model (pipeline)

In [0]:
fit_model.write().overwrite().save(basePath + "/model")

### Load Model for new prediction

In [0]:
#Load / set basePath
basePath = "/FileStore/finalproject"

##### Load the pipeline

In [0]:
from pyspark.ml import Pipeline
pipe_new = Pipeline.load(basePath + "/pipeline")

##### Load the trained model (pipeline)

In [0]:
from pyspark.ml import PipelineModel
load_fit_model = PipelineModel.load(basePath + "/model")

##### Test Loading trained model. To be removed in the product

In [0]:
new_result = load_fit_model.transform(test_data)

In [0]:
pred_and_actual = new_result.select("prediction","indexedLabel").withColumnRenamed("indexedLabel","label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("The accuracy of the model is {}".format(evaluator.evaluate(pred_and_actual)))

The accuracy of the model is 0.713487071977638


In [0]:
pred_and_actual = new_result.select(['prediction','indexedLabel']).withColumn('label', f.col('indexedLabel').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
pred_and_actual = pred_and_actual.select(['prediction','label'])

metrics = MulticlassMetrics(pred_and_actual.rdd.map(tuple))

#print(metrics.confusionMatrix().toArray())
print("Below is the confusion matrix \n {}".format(metrics.confusionMatrix().toArray()))



Below is the confusion matrix 
 [[688. 134.  33.]
 [143. 228.  27.]
 [ 43.  30. 105.]]
