# Stock Marcket Prediction using Daily News Headlines

## Description of the project

The goal of this project is to predict whether the stockmarket value will fall or not based on the daily news from top 25 news outlets. The data used  for this project is obtained from Kaggle datasets which can be found at https://www.kaggle.com/aaron7sun/stocknews

The file stockMarketAndNewsData.csv contains all the required data. The columns in the data include **Date**, **Label** (0 if stockmarket value droped, 1 otherwise), and **Top1** - **Top25** store the text of the daily top 25 news headlines.


## Import libraries 

In [23]:
import numpy
import pandas as pd
import matplotlib.pyplot as plt
from pylab import *
from pyspark.sql.functions import udf, concat, col, lit
from pyspark.sql.types import IntegerType, ArrayType, StringType, DoubleType, StructType, StructField
import string
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorIndexer, CountVectorizer, Tokenizer, StopWordsRemover, NGram
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### The schema of the data 

In [2]:
schemaData = StructType([StructField('Date', StringType(), True), StructField('Label', IntegerType(), True), 
                         StructField('Top1', StringType(), True), StructField('Top2', StringType(), True),
                         StructField('Top3', StringType(), True), StructField('Top4', StringType(), True),
                         StructField('Top5', StringType(), True), StructField('Top6', StringType(), True),
                         StructField('Top7', StringType(), True), StructField('Top8', StringType(), True),
                         StructField('Top9', StringType(), True), StructField('Top10', StringType(), True),
                         StructField('Top11', StringType(), True), StructField('Top12', StringType(), True),
                         StructField('Top13', StringType(), True), StructField('Top14', StringType(), True),
                         StructField('Top15', StringType(), True), StructField('Top16', StringType(), True),
                         StructField('Top17', StringType(), True), StructField('Top18', StringType(), True),
                         StructField('Top19', StringType(), True), StructField('Top20', StringType(), True),
                         StructField('Top21', StringType(), True), StructField('Top22', StringType(), True),
                         StructField('Top23', StringType(), True), StructField('Top24', StringType(), True),
                         StructField('Top25', StringType(), True)])

### Load the data

In [3]:
data = sqlContext.read.load('/user/ebisa/stockMarketAndNewsData.csv', 
                          delimiter=',',
                          format='com.databricks.spark.csv', 
                          header='true', 
                          schema=schemaData,
                          inferSchema='false')

### The number and range of dates represented in the data

We have 1989 days represent that range from August 2008 to July 2016.

In [4]:
data.describe('Date').show()

+-------+----------+
|summary|      Date|
+-------+----------+
|  count|      1989|
|   mean|      null|
| stddev|      null|
|    min|2008-08-08|
|    max|2016-07-01|
+-------+----------+



### Percentage of days the stockmarket value dropped

It can be observed that the stock value decreased on 46.5% of the days.

In [5]:
numbRecords = data.count()
stockRiseOrFall = data.groupBy('Label').count().toPandas()
stockRiseOrFall['percentage'] = 100*stockRiseOrFall['count']/numbRecords
stockRiseOrFall

Unnamed: 0,Label,count,percentage
0,0,924,46.455505
1,1,1065,53.544495


# Perform data preprocessing

### Merge the text of the news from different sources per day

Replace null values with empty string.

In [6]:
data = data.na.fill('')

Get the list of the columns that represent the news.

In [7]:
newsColumns = [x for x in data.columns if x not in ['Date', 'Label']]

Concatenate the text of the top news happening on the same day.

In [8]:
data = data.withColumn("allNews", data.Top1)
for i in range(2, len(newsColumns)+1):
    colName = 'Top' + str(i)
    data = data.withColumn('allNews', concat(col("allNews"), lit(" "), col(colName)))

### Remove puntuation marks from the news

In [9]:
removePunctuation = udf(lambda x: ''.join([' ' if ch in string.punctuation else ch for ch in x]))
data = data.withColumn('allNews', removePunctuation(data.allNews))

### Split the news into words (tokenization)

We also remove empty strings, as well as words with with single character.

In [10]:
splitNews = udf(lambda s: [x for x in s.split(' ') if (x != u'' and len(x) >= 2)], ArrayType(StringType(), True))
data = data.withColumn('words', splitNews(data.allNews)).select('Date', 'label', 'words')

### Remove the stop words

In [11]:
myStopwordRemover = StopWordsRemover(inputCol="words", outputCol="stopRemoved")
data = myStopwordRemover.transform(data)

### Create ngrams of words

In this project we use n-grams with n=2. But it is possible to change the value of n to a desired value.

In [12]:
myngram = NGram(inputCol="stopRemoved", outputCol="ngrams", n=2)
data = myngram.transform(data)
data = data.withColumn('ngrams', data.ngrams.cast(ArrayType(StringType(), True)))

### Apply Countvectorizer to obtain the frequency of each of the ngrams

In [13]:
myCountVectorizer = CountVectorizer(inputCol="ngrams", outputCol="countVect", minDF=1.0)
data = myCountVectorizer.fit(data).transform(data)

# Build the prediction model 

### Transform the label into labeled indices using StringIndexer

In [14]:
data = data.withColumnRenamed('label', 'label-orig')
si_label = StringIndexer(inputCol="label-orig", outputCol="label", handleInvalid="skip")
data = si_label.fit(data).transform(data)

### Divide into training and test data

In [15]:
trainData = data[data['Date'] < '20150101']
testData = data[data['Date'] >= '20141231']

### Build the random forest classifier model

In [16]:
rf = RandomForestClassifier(labelCol="label", featuresCol="countVect", numTrees=3, maxDepth=4, maxBins=200)

### Build a grid search for model selection

In [17]:
grid = ParamGridBuilder().addGrid(rf.numTrees, [2, 5, 10])\
                         .addGrid(rf.maxDepth, [2, 5, 10])\
                         .build()

### Apply cross validator to minimize overfitting

In [18]:
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator)

### Train the model on the training data

In [19]:
cvModel = cv.fit(trainData)

### Predict on the test data

In [20]:
testResult = cvModel.transform(testData)

### Evaluate the performance of the prediction on the test data

In [21]:
evaluator.evaluate(testResult)

0.5235495071684588