# NEWS SENTIMENT

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk
import warnings
import csv
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt 

from pyspark.sql.functions import col, lit, regexp_replace, lower
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import Window

import re
import datetime

In [2]:
# Creating a spark session

spark = SparkSession.builder.appName('DM').config("spark.executor.heartbeatInterval", "110000ms ").getOrCreate() 


In [3]:
# Creating path variables for all available data

data_path = "" # Path to Data folder
lexicon_path = "" # Path to lexicon data for sentiment analysis
news_path = "" # Path to news data
main_data_path = data_path + "\\final_data.csv" # Path to DJUSCG index data

SENTIMENT

In [4]:
# The news dataset and main dataset are converted into Spark Data Frame

news_dataFrame = spark.read.csv(news_path, header=True, inferSchema=True)
df_main = spark.read.csv(main_data_path, header=True, inferSchema=True)

In [5]:
# Cleaning the unncessary columns in news dataset

dropCols = ('author','title','url','section','publication')
news_dataFrame = news_dataFrame.drop(*dropCols)

In [6]:
# Cleaning the null values from the news dataset

news_dataFrame = news_dataFrame.dropna()

In [7]:
# A function to extract the date from a given String

def extractDate(sentence):
    x = re.search("\d{4}-\d{1,2}-\d{1,2}", sentence)
    if x:
        dateString =  x.group()
        return dateString

    else:
        return None
    
udf_newDate = F.udf(lambda z: extractDate(z), StringType())
news_dataFrame = news_dataFrame.withColumn('newDate', udf_newDate('date'))

In [8]:
# The extracted date is converted into DateType() format

news_dataFrame = news_dataFrame.withColumn('dateFormat', to_date(col('newDate'), 'yyyy-MM-dd')) \
                        .dropna() \
                        .drop('date', 'newDate') \
                        .withColumnRenamed('dateFormat', 'date')

Lexicon Files Analysis

In [9]:
sia = SentimentIntensityAnalyzer()

stock_lex_path = lexicon_path + "/stock_lex.csv" 
stock_lex = pd.read_csv(stock_lex_path)

stock_lex['sentiment'] = (stock_lex['Aff_Score'] + stock_lex['Neg_Score'])/2
stock_lex = dict(zip(stock_lex.Item, stock_lex.sentiment))
stock_lex = {k:v for k,v in stock_lex.items() if len(k.split(' '))==1}
stock_lex_scaled = {}
for k, v in stock_lex.items():
    if v > 0:
        stock_lex_scaled[k] = v / max(stock_lex.values()) * 4
    else:
        stock_lex_scaled[k] = v / min(stock_lex.values()) * -4
        
positive = []
with open(lexicon_path + '/lm_positive.csv', 'r') as f:
     reader = csv.reader(f)
     for row in reader:
         positive.append(row[0].strip())
negative = []
with open(lexicon_path + '/lm_negative.csv', 'r') as f:
     reader = csv.reader(f)
     for row in reader:
         entry = row[0].strip().split(" ")
         if len(entry) > 1:
             negative.extend(entry)
         else:
             negative.append(entry[0])
final_lex = {}
final_lex.update({word:2.0 for word in positive})
final_lex.update({word:-2.0 for word in negative})
final_lex.update(stock_lex_scaled)
final_lex.update(sia.lexicon)
sia.lexicon = final_lex

In [10]:
# Because the dat set contains 2.7 million news articles related to different topics, 
# we searched for news related to "Dow J" keyword


news_dataFrame = news_dataFrame.filter(col("article").contains("Dow J"))

In [11]:
# A function that returns the positive, negative, and neutral sentiment scores for a given article / sentence.

def sentiment_scores(sentence):
    sentiment_dict = sia.polarity_scores(sentence)
     
    print("Overall sentiment dictionary is : ", sentiment_dict)
    print("sentence was rated as ", sentiment_dict['neg']*100, "% Negative")
    print("sentence was rated as ", sentiment_dict['neu']*100, "% Neutral")
    print("sentence was rated as ", sentiment_dict['pos']*100, "% Positive")
 
    print("Sentence Overall Rated As", end = " ")
 
    # sentence.withColumn("Sentiment_Score",(sentiment_dict['compound']))
    # decide sentiment as positive, negative and neutral
    if sentiment_dict['compound'] >= 0.05 :
        print("Positive")
 
    elif sentiment_dict['compound'] <= - 0.05 :
        print("Negative")
 
    else :
        print("Neutral")
        
    return sentiment_dict['compound']

udf_sentiment_scores = F.udf(lambda z: sentiment_scores(z), DoubleType())

In [12]:
# Regular Expressions are removed from "article" column because they do not possess any sentiment.

news_dataFrame = news_dataFrame.select("date", regexp_replace("article", "[^0-9a-zA-Z_\- &]+", "").alias('replaced_str'))

In [13]:
# All letters were converted to lower case.

news_dataFrame = news_dataFrame.select('date', lower(col('replaced_str')).alias('replaced_lower_str'))

In [14]:
# This function removes all the stopwords (e.g. and, the, is, an, a, etc.)

def myStopwordRemover(df:pyspark.sql.DataFrame, inputColName:str, outputColName:str) -> pyspark.sql.DataFrame:
    from nltk.corpus import stopwords
    from nltk.tokenize import word_tokenize
    udf_remove_stop_words = F.udf(lambda x: ' '.join([word for word in x.split() if word not in (stop)]))
    stop = set(stopwords.words('english'))
    newDf = df.withColumn(outputColName, udf_remove_stop_words(inputColName))
    return newDf

In [15]:
# The stop words are removed from the dataframe.

news_dataFrame = myStopwordRemover(news_dataFrame, 'replaced_lower_str', 'Cleaned_Article')
news_dataFrame = news_dataFrame.drop('replaced_lower_str')

In [16]:
# The sentiment scores have been calculated for each article and are added into their corresponding records.

news_dataFrame = news_dataFrame.withColumn("Sentiment_Score", udf_sentiment_scores('Cleaned_Article')) \
                    .drop('Cleaned_Article') \
                    .groupBy('date').avg()

In [17]:
# A pyspark dataframe containing the only column of dates from 02-01-2015 to 30-03-2020

import datetime
def get_date_df():
    initial_date = datetime.date(2015, 1, 2 )
    days = 1916
    one_day = datetime.timedelta(days=1)
    all_days = [{"date": initial_date + i * one_day} for i in range(days)]
    return spark.createDataFrame(Row(**x) for x in all_days)
date_df = get_date_df()

In [18]:
# The above date dataframe is left joined with news dataframe, so as to have all the missing dates with null values in the news_dataFrame

news_dataFrame = date_df.join(news_dataFrame, 'date', 'left')

In [19]:
# The null values added into the news dataframe after join operation are swapped with their previous available day's sentiment score,
# if previous sentiment score is not available, then it's sentiment score is assigned to 0
# This is done because, it is assusmed as sentiment score doesn't change until the release of next news.

Windowspec=Window.orderBy("date")
news_dataFrame = news_dataFrame.withColumn('Sentiment Score',F.when(F.isnull(F.col('avg(Sentiment_Score)')),F.lag(F.col('avg(Sentiment_Score)'), 1, 0).over(Windowspec)).otherwise(F.col('avg(Sentiment_Score)')))
news_dataFrame = news_dataFrame.withColumn('Final Sentiment Score',F.when(F.isnull(F.col('Sentiment Score')), 0).otherwise(F.col('Sentiment Score')))

In [20]:
news_dataFrame.printSchema()

root
 |-- date: date (nullable = true)
 |-- avg(Sentiment_Score): double (nullable = true)
 |-- Sentiment Score: double (nullable = true)
 |-- Final Sentiment Score: double (nullable = true)



In [21]:
# Dropping the columns that have been created in the process, but are not useful anymore.

news_dataFrame = news_dataFrame.drop('avg(Sentiment_Score)')
news_dataFrame = news_dataFrame.drop('Sentiment Score')

In [22]:
# Converting the Date Column in main dataset in pyspark from StringType() to DateTpe()

df_main = df_main.withColumn('dateFormat', to_date(unix_timestamp(col('date'), 'dd-MM-yyyy').cast("timestamp"))) \
                .drop('Date') \
                .withColumnRenamed('dateFormat', 'date')

In [23]:
# The news dataframe is inner joined with main dataframe on "Date" column basis,
# Inner join joins the columns from both dataframes when the 'Date' column values are same in both the dataframes.

df_main = df_main.join(news_dataFrame, ['date'], how="inner")

In [24]:
# The sentiment score values in the column have been changed to maintain their values till 4 decimal places.

df_main = df_main.withColumn('Sentiment Score', F.round(F.col('Final Sentiment Score'), 4)) \
                    .withColumnRenamed('date', 'Date') \
                    .drop(F.col('Final Sentiment Score'))

In [26]:
df_main.coalesce(1).write.mode('overwrite').option('header', True).csv("FINAL_DATA")