# Daily News & Stock Market Correlation-Prediction (4/4)

**NOTE:** This is the _4th (final) part_ of the notebook. To see the _3rd part_, click **[here.](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1993205155917960/4235175522480070/6079964132923530/latest.html)**

## Structured Streaming

So far, we've shown a number of examples of data visualization and tested a few hypotheses. To round off, let's see how we could incorporate Spark's Streaming capability to analyze market data in real time and make decisions.

First, we'll split our News database from one CSV file into multiple files in a separate directory. This will give us a sample of news "events" to work with.

Next, we'll setup our streaming `DataFrame`. We'll listen for new headlines, analyze their top keyword and frequency, determine the neutrality, and write them to an in-memory table.

We'll then query the table a couple of times to show the flow of data through our system.

In [0]:
# Install dependencies
!pip install nltk
!pip install wordcloud
!pip install vaderSentiment

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-8f285768-75e3-4555-9e61-798b8353ac96/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-8f285768-75e3-4555-9e61-798b8353ac96/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-8f285768-75e3-4555-9e61-798b8353ac96/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
%fs mkdirs /tmp/data/in/stream

In [0]:
!mkdir -p ./data/stream

In [0]:
!ls ./data

Combined_News_DJIA.csv	stocknews.zip  upload_DJIA_table.csv
RedditNews.csv		stream


In [0]:
# DBFS paths
DBFS_DATA_DIR = f"/tmp/data/"
in_path = f"dbfs:{DBFS_DATA_DIR}/out/"

In [0]:
# We'll read this file and write it line-by-line until EOF, and
# we'll skip the header. We'll also filter only the first m
# rows (plus header) as this is simply a demo.

import os

n, m = -1, 10_000
m += 1 # header
pathNews = os.path.abspath(f"./data/RedditNews.csv")
with open(pathNews, "r") as f:
    record = f.readline()
    while record:
        if n == -1:
            n += 1
            record = f.readline()
            continue
        if n == m:
            break
        with open(f"./data/stream/event-{n}.csv", "w") as ef:
            ef.write(record)
        record = f.readline()
        n += 1

In [0]:
!ls -1 ./data/stream

event-0.csv
event-1.csv
event-10.csv
event-100.csv
event-1000.csv
event-10000.csv
event-1001.csv
event-1002.csv
event-1003.csv
event-1004.csv
event-1005.csv
event-1006.csv
event-1007.csv
event-1008.csv
event-1009.csv
event-101.csv
event-1010.csv
event-1011.csv
event-1012.csv
event-1013.csv
event-1014.csv
event-1015.csv
event-1016.csv
event-1017.csv
event-1018.csv
event-1019.csv
event-102.csv
event-1020.csv
event-1021.csv
event-1022.csv
event-1023.csv
event-1024.csv
event-1025.csv
event-1026.csv
event-1027.csv
event-1028.csv
event-1029.csv
event-103.csv
event-1030.csv
event-1031.csv
event-1032.csv
event-1033.csv
event-1034.csv
event-1035.csv
event-1036.csv
event-1037.csv
event-1038.csv
event-1039.csv
event-104.csv
event-1040.csv
event-1041.csv
event-1042.csv
event-1043.csv
event-1044.csv
event-1045.csv
event-1046.csv
event-1047.csv
event-1048.csv
event-1049.csv
event-105.csv
event-1050.csv
event-1051.csv
event-1052.csv
event

In [0]:
# Let's see an example event (news)
!cat ./data/stream/event-0.csv
!cat ./data/stream/event-25.csv

2016-07-01,"A 117-year-old woman in Mexico City finally received her birth certificate, and died a few hours later. Trinidad Alvarez Lira had waited years for proof that she had been born in 1898."
2016-06-30,Jamaica proposes marijuana dispensers for tourists at airports following legalisation: The kiosks and desks would give people a license to purchase up to 2 ounces of the drug to use during their stay


In [0]:
# Copy to DBFS
!pwd

/databricks/driver


In [0]:
# Commented out because our DBFS already contains the events from previous runs!
# %fs cp -r file:///databricks/driver/data/stream /tmp/data/in/stream

In [0]:
%fs ls /tmp/data/in/stream

path,name,size,modificationTime
dbfs:/tmp/data/in/stream/event-0.csv,event-0.csv,198,1675903575000
dbfs:/tmp/data/in/stream/event-1.csv,event-1.csv,60,1674846101000
dbfs:/tmp/data/in/stream/event-10.csv,event-10.csv,279,1675950644000
dbfs:/tmp/data/in/stream/event-10000.csv,event-10000.csv,124,1675878651000
dbfs:/tmp/data/in/stream/event-10006.csv,event-10006.csv,207,1675880617000
dbfs:/tmp/data/in/stream/event-10007.csv,event-10007.csv,82,1675880313000
dbfs:/tmp/data/in/stream/event-10010.csv,event-10010.csv,238,1675878273000
dbfs:/tmp/data/in/stream/event-10012.csv,event-10012.csv,227,1675878324000
dbfs:/tmp/data/in/stream/event-10014.csv,event-10014.csv,102,1675879523000
dbfs:/tmp/data/in/stream/event-10015.csv,event-10015.csv,70,1675879718000


In [0]:
%fs head /tmp/data/in/stream/event-0.csv

In [0]:
# Let's define the schema for our streamed table
import pyspark.sql.types as T

schemaNews = T.StructType([
    T.StructField("Date", T.DateType(), True),
    T.StructField("News", T.StringType(), True),
])

In [0]:
import pyspark.sql.functions as F

# Absolute path required by PySpark
import os
pathNews = os.path.abspath(f"{DBFS_DATA_DIR}/in/stream")
print(pathNews)

# Create reading stream
streamInputDf = spark.readStream \
    .schema(schemaNews) \
    .csv(pathNews)

/tmp/data/in/stream


In [0]:
# Tokenize and lemmatize UDFs for stream
import re
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

stopwords_eng = set(stopwords.words("english"))
def udf_lemmatize(txt):
    txt = str(txt)
    txt = re.sub(r"[^a-zA-Z\s]", "", txt)
    txt = re.sub(r"^b(.*)", r"\g<1>", txt)
    txt = txt.lower()
    txt = re.sub(r"[^a-z\s]", "", txt)
    txt = txt.split()
    
    ps = PorterStemmer()
    return [ps.stem(x) for x in txt if x not in stopwords_eng and ps.stem(x) != ""]

In [0]:
# TODO needs more optimization, otherwise too slow!
# Set up NLP for streaming DF
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

analyzer = SentimentIntensityAnalyzer()
def get_nlp(text):
    if not text:
        return 0
    if not (text in get_nlp.cache):
        get_nlp.cache[text] = analyzer.polarity_scores(text)["compound"]
    t = get_nlp.cache[text]
    if not t:
        return 0
    else:
        return t
get_nlp.cache = dict()


def get_neutrality(x):
    if not x or not (type(x) == type(0.1)):
        return "Unknown"
    if x >= 0.05:
        return "Positive"
    if x <= -0.05:
        return "Negative"
    return "Neutral"

In [0]:
# Create the in-memory table. We'll add the extracted words,
# the "cleaned" text, neutrality score and emotion
streamingNewsDf = streamInputDf \
    .withColumn("Words", F.udf(lambda x: udf_lemmatize(x), T.ArrayType(T.StringType()))(F.col("News"))) \
    .withColumn("Clean", F.concat_ws(" ", F.col("Words"))) \
    .withColumn("Neutrality", F.udf((lambda x: get_nlp(x)), T.DoubleType())(F.col("Clean")).cast(T.DoubleType())) \
    .withColumn("Emotion", F.udf((lambda x: get_neutrality(x)), T.StringType())(F.col("Neutrality")).cast(T.StringType()))

In [0]:
# Set streaming options - few shuffles, and start
# the streaming
spark.conf.set("spark.sql.shuffle.partitions", "2")

query = (
    streamingNewsDf
    .writeStream 
    .format("memory") # Write only to memory
    .queryName("news") # Name of the table = "news"
    .outputMode("append") # Append new rows -- no need to overwrite
    .start() # Start the stream :)
)

Our stream is now up and running! Let's query the data a couple of times with some pauses in-between so we can see new events being streamed:

In [0]:
%sql select * from news

Date,News,Words,Clean,Neutrality,Emotion


In [0]:
%sql select count(*) from news

count(1)
0


In [0]:
# Sleep and wait for new data...
import time
time.sleep(5)

In [0]:
%sql select count(*) from news

count(1)
0


The row count should be increasing with each new query. (This might not be the case if the cluster is working fast and has already processed all our entries.)

Finally, we have to close our stream:

In [0]:
query.stop()

## References

1. Sun, J. (2016, Aug). Daily News for Stock Market Prediction, Version 1. Retrieved 24 Jan 2023 from https://www.kaggle.com/aaron7sun/stocknews.

2. Barve, S. (2022, Jul). Easy way to use Kaggle atasets in Google Colab. Retrieved 24 Jan 2023 from https://www.kaggle.com/general/51898#1884104.

3. Levi, J. (2020, Sep). "Bar chart in matplotlib using a colormap". Retrieved 25 Jan 2023 from https://stackoverflow.com/a/64068828

4. Wikipedia. Gaza War (2008&ndash;2009). Retrieved 26 Jan 2023 from https://en.wikipedia.org/wiki/Gaza_War_(2008%E2%80%932009)

5. Greenwald, G. et al. Edward Snowden: the whistleblower behind the NSA surveillance revelations. Retrieved 26 Jan 2023 from https://www.theguardian.com/world/2013/jun/09/edward-snowden-nsa-whistleblower-surveillance

[![Author - Andreja Nesic](https://andrejanesic.com/git-signature-sm.png)](https://github.com/andrejanesic)