In [1]:
# Spark init
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [2]:
import pandas as pd
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sia=SentimentIntensityAnalyzer()

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!




In [3]:
import pyspark
from pyspark.sql.types import StructField
from pyspark.sql.types import *

custom_schema = StructType([
        StructField("Headline", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("Doc_type", StringType(), True),
        StructField("URL", StringType(), True), 
        StructField("snippet", StringType(), True),
        StructField("keywords", StringType(), True)
    ])


In [4]:
df_news = df = spark.read.csv("/content/drive/MyDrive/data/2020-22 NYT Headlines.csv", sep=',', schema = custom_schema, header=True)


In [5]:
df_news.show()

+--------------------+----------+--------+--------------------+--------------------+--------------------+
|            Headline|      Date|Doc_type|                 URL|             snippet|            keywords|
+--------------------+----------+--------+--------------------+--------------------+--------------------+
|Already Had Plent...|2020-01-02| article|https://www.nytim...|He’s a bad show, ...|['Presidential El...|
|Why Did One-Quart...|2020-01-02| article|https://www.nytim...|Swine fever devas...|['Pigs', 'Agricul...|
|Coast Guard Suspe...|2020-01-02| article|https://www.nytim...|Two of seven crew...|['Rescues', 'Mari...|
|N.B.A. Superstars...|2020-01-02| article|https://www.nytim...|The longtime comm...|      ['Basketball']|
|In Rose Bowl Vict...|2020-01-02| article|https://www.nytim...|The Ducks managed...|['Football (Colle...|
|Where Darth Vader...|2020-01-02| article|https://www.nytim...|Ed Sessa turns th...|['Crossword Puzzl...|
|Don Larsen, Yanke...|2020-01-02| article|http

In [6]:
df_news.describe()

DataFrame[summary: string, Headline: string, Date: string, Doc_type: string, URL: string, snippet: string, keywords: string]

In [7]:
from pyspark.sql.functions import to_date
 
df = df_news.withColumn('Date',to_date(df_news.Date, 'yyyy-MM-dd'))

df.printSchema()


root
 |-- Headline: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Doc_type: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- snippet: string (nullable = true)
 |-- keywords: string (nullable = true)



In [8]:
df.show()

+--------------------+----------+--------+--------------------+--------------------+--------------------+
|            Headline|      Date|Doc_type|                 URL|             snippet|            keywords|
+--------------------+----------+--------+--------------------+--------------------+--------------------+
|Already Had Plent...|2020-01-02| article|https://www.nytim...|He’s a bad show, ...|['Presidential El...|
|Why Did One-Quart...|2020-01-02| article|https://www.nytim...|Swine fever devas...|['Pigs', 'Agricul...|
|Coast Guard Suspe...|2020-01-02| article|https://www.nytim...|Two of seven crew...|['Rescues', 'Mari...|
|N.B.A. Superstars...|2020-01-02| article|https://www.nytim...|The longtime comm...|      ['Basketball']|
|In Rose Bowl Vict...|2020-01-02| article|https://www.nytim...|The Ducks managed...|['Football (Colle...|
|Where Darth Vader...|2020-01-02| article|https://www.nytim...|Ed Sessa turns th...|['Crossword Puzzl...|
|Don Larsen, Yanke...|2020-01-02| article|http

In [9]:
custom_schema = StructType([
        StructField("Date", StringType(), True),
        StructField("Time", StringType(), True),
        StructField("Open", FloatType(), True),
        StructField("High", FloatType(), True), 
        StructField("Low", FloatType(), True),
        StructField("Close", FloatType(), True)
    ])

In [10]:
df_hc = spark.read.csv("/content/drive/MyDrive/data/HC_AZN.csv", sep=',', schema = custom_schema, header=True)

In [11]:
df_hc.show()

+----------+--------+-----+-----+-----+-----+
|      Date|    Time| Open| High|  Low|Close|
+----------+--------+-----+-----+-----+-----+
|2022-03-21|16:11:00|63.48|63.48|63.48|63.48|
|2022-03-21|16:06:00|63.48|63.48|63.48|63.48|
|2022-03-21|16:03:00|63.48|63.48|63.48|63.48|
|2022-03-21|16:02:00|63.48|63.48|63.48|63.48|
|2022-03-21|16:01:00|63.48|63.48|63.48|63.48|
|2022-03-21|16:00:00|63.48| 63.5|63.46|63.49|
|2022-03-21|15:59:00|63.48| 63.5|63.47|63.48|
|2022-03-21|15:58:00|63.45|63.49|63.45|63.48|
|2022-03-21|15:57:00|63.46|63.47|63.44|63.44|
|2022-03-21|15:56:00|63.46|63.47|63.42|63.46|
|2022-03-21|15:55:00|63.48|63.49|63.46|63.46|
|2022-03-21|15:54:00|63.48|63.49|63.46|63.48|
|2022-03-21|15:53:00|63.48| 63.5|63.46|63.48|
|2022-03-21|15:52:00|63.44|63.48|63.43|63.48|
|2022-03-21|15:51:00|63.38|63.46|63.38|63.43|
|2022-03-21|15:50:00|63.39|63.39|63.36|63.38|
|2022-03-21|15:49:00|63.42|63.43|63.38|63.38|
|2022-03-21|15:48:00|63.42|63.43| 63.4|63.42|
|2022-03-21|15:47:00| 63.4|63.43|6

In [12]:
df_merged = df_hc.join(df_news,on='Date')
df_merged.head()

Row(Date='2020-04-13', Time='16:23:00', Open=44.93000030517578, High=44.93000030517578, Low=44.93000030517578, Close=44.93000030517578, Headline='Bob Iger Thought He Was Leaving on Top. Now, He’s Fighting for Disney’s Life.', Doc_type='article', URL='https://www.nytimes.com/2020/04/12/business/media/disney-ceo-coronavirus.html', snippet='The former C.E.O. thought he was riding into the sunset. Now he’s reasserting control and reimagining Disney as a company with fewer employees and more thermometers.', keywords="['Coronavirus (2019-nCoV)', 'Media']")

In [13]:
df_merged.distinct().show()

+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+
|      Date|    Time| Open| High|  Low|Close|            Headline|  Doc_type|                 URL|             snippet|            keywords|
+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|Bob Iger Thought ...|   article|https://www.nytim...|The former C.E.O....|['Coronavirus (20...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|A Corporate Merge...|   article|https://www.nytim...|The United States...|['Mergers, Acquis...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|Trump Lashes Out ...|   article|https://www.nytim...|The president ret...|['Coronavirus (20...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|‘Westworld’ Seaso...|   article|https://www.nytim...|This season is be...|      ['Television']|
|2020-04-13|1

In [14]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import lower
from pyspark.sql.functions import regexp_replace
from nltk.tokenize import word_tokenize
from pyspark.sql import Row
from pyspark.ml.feature import StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer
import pyspark.sql.functions as F
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import to_timestamp
from pyspark import StorageLevel

df = df_merged.select('*', (lower(regexp_replace('snippet', "[^a-zA-Z\\s]", "")).alias('Regex Removed & Lower Cased Articles')))

In [15]:
def tokenize(df):
    tokenizer = Tokenizer(inputCol="Regex Removed & Lower Cased Articles", outputCol="Tokenized Articles")
    return tokenizer.transform(df)

In [16]:
news_df = tokenize(df)

In [17]:
news_df.show()

+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+
|      Date|    Time| Open| High|  Low|Close|            Headline|  Doc_type|                 URL|             snippet|            keywords|Regex Removed & Lower Cased Articles|  Tokenized Articles|
+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|Bob Iger Thought ...|   article|https://www.nytim...|The former C.E.O....|['Coronavirus (20...|                the former ceo th...|[the, former, ceo...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|A Corporate Merge...|   article|https://www.nytim...|The United States...|['Mergers, Acquis...|                the united states...|[the, united, sta...|
|2020

In [18]:
def stop_word_remove(df):
    remover = StopWordsRemover(inputCol="Tokenized Articles", outputCol="Articles without stop words")
    return remover.transform(df)

In [19]:
news_df = stop_word_remove(news_df)

In [20]:
news_df.show()

+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+---------------------------+
|      Date|    Time| Open| High|  Low|Close|            Headline|  Doc_type|                 URL|             snippet|            keywords|Regex Removed & Lower Cased Articles|  Tokenized Articles|Articles without stop words|
+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+---------------------------+
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|Bob Iger Thought ...|   article|https://www.nytim...|The former C.E.O....|['Coronavirus (20...|                the former ceo th...|[the, former, ceo...|       [former, ceo, tho...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|A Corporate Merge...|   article|https://www.nyt

In [21]:
from pyspark.ml.feature import HashingTF, Tokenizer
hashingTF = HashingTF(inputCol="Articles without stop words", outputCol="rawFeatures")

In [22]:
featurizedData = hashingTF.transform(news_df)

In [23]:
news_df.show()

+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+---------------------------+
|      Date|    Time| Open| High|  Low|Close|            Headline|  Doc_type|                 URL|             snippet|            keywords|Regex Removed & Lower Cased Articles|  Tokenized Articles|Articles without stop words|
+----------+--------+-----+-----+-----+-----+--------------------+----------+--------------------+--------------------+--------------------+------------------------------------+--------------------+---------------------------+
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|Bob Iger Thought ...|   article|https://www.nytim...|The former C.E.O....|['Coronavirus (20...|                the former ceo th...|[the, former, ceo...|       [former, ceo, tho...|
|2020-04-13|16:23:00|44.93|44.93|44.93|44.93|A Corporate Merge...|   article|https://www.nyt

In [24]:
featurizedData.show(truncate = False)

+----------+--------+-----+-----+-----+-----+-----------------------------------------------------------------------------+----------+--------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
df = featurizedData['Open', 'High', 'Low', 'Close', 'snippet']

In [26]:
from pyspark.ml.linalg import Vector  
from pyspark.ml.feature import VectorAssembler

In [27]:
vector_assmebler=VectorAssembler(inputCols=['Open','High','Low','Close'] 
                                ,outputCol='features')
df_transformed=vector_assmebler.transform(df)

df_transformed.show()

+-----+-----+-----+-----+--------------------+--------------------+
| Open| High|  Low|Close|             snippet|            features|
+-----+-----+-----+-----+--------------------+--------------------+
|41.04|41.04|41.04|41.04|A report by Amnes...|[41.0400009155273...|
|40.96|40.96|40.96|40.96|A report by Amnes...|[40.9599990844726...|
|40.81|40.81|40.81|40.81|A report by Amnes...|[40.8100013732910...|
| 40.8| 40.8| 40.8| 40.8|A report by Amnes...|[40.7999992370605...|
|40.89|40.89|40.89|40.89|A report by Amnes...|[40.8899993896484...|
|41.13|41.13|41.13|41.13|A report by Amnes...|[41.1300010681152...|
|41.02|41.02|41.02|41.02|A report by Amnes...|[41.0200004577636...|
|41.36|41.36|41.36|41.36|A report by Amnes...|[41.3600006103515...|
|41.45|41.45|41.45|41.45|A report by Amnes...|[41.4500007629394...|
|41.37|41.37|41.37|41.37|A report by Amnes...|[41.3699989318847...|
|41.17|41.17|41.17|41.17|A report by Amnes...|[41.1699981689453...|
| 41.2|41.29| 41.2|41.29|A report by Amnes...|[4

In [28]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'snippet', outputCol = 'word_index')
df = indexer.fit(df_transformed).transform(df_transformed)
df.show()

+-----+-----+-----+-----+--------------------+--------------------+----------+
| Open| High|  Low|Close|             snippet|            features|word_index|
+-----+-----+-----+-----+--------------------+--------------------+----------+
|41.04|41.04|41.04|41.04|A report by Amnes...|[41.0400009155273...|   65936.0|
|40.96|40.96|40.96|40.96|A report by Amnes...|[40.9599990844726...|   65936.0|
|40.81|40.81|40.81|40.81|A report by Amnes...|[40.8100013732910...|   65936.0|
| 40.8| 40.8| 40.8| 40.8|A report by Amnes...|[40.7999992370605...|   65936.0|
|40.89|40.89|40.89|40.89|A report by Amnes...|[40.8899993896484...|   65936.0|
|41.13|41.13|41.13|41.13|A report by Amnes...|[41.1300010681152...|   65936.0|
|41.02|41.02|41.02|41.02|A report by Amnes...|[41.0200004577636...|   65936.0|
|41.36|41.36|41.36|41.36|A report by Amnes...|[41.3600006103515...|   65936.0|
|41.45|41.45|41.45|41.45|A report by Amnes...|[41.4500007629394...|   65936.0|
|41.37|41.37|41.37|41.37|A report by Amnes...|[41.36

In [29]:
df_model=df.select('features','word_index')
df_model.show( truncate= False)

+-----------------------------------------------------------------------------+----------+
|features                                                                     |word_index|
+-----------------------------------------------------------------------------+----------+
|[41.040000915527344,41.040000915527344,41.040000915527344,41.040000915527344]|65936.0   |
|[40.959999084472656,40.959999084472656,40.959999084472656,40.959999084472656]|65936.0   |
|[40.810001373291016,40.810001373291016,40.810001373291016,40.810001373291016]|65936.0   |
|[40.79999923706055,40.79999923706055,40.79999923706055,40.79999923706055]    |65936.0   |
|[40.88999938964844,40.88999938964844,40.88999938964844,40.88999938964844]    |65936.0   |
|[41.130001068115234,41.130001068115234,41.130001068115234,41.130001068115234]|65936.0   |
|[41.02000045776367,41.02000045776367,41.02000045776367,41.02000045776367]    |65936.0   |
|[41.36000061035156,41.36000061035156,41.36000061035156,41.36000061035156]    |65936.0   |

In [31]:
(training, test) = df_model.randomSplit([0.8, 0.2], seed = 1234)
print (training.first())

Py4JJavaError: ignored

In [None]:
#Linear regression model
from pyspark.ml.regression import LinearRegression
Linear_Regression=LinearRegression(labelCol='word_index')
regression = Linear_Regression.fit(training)