# Initialize the Python Notebook

Install the necessary Python packages found in requirements.txt and import the necessary Python libraries.

In [41]:
%%capture
!pip3 install -r requirements.txt

import nltk
import os
import shutil
import uuid
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.types import (
    DateType,
    DecimalType,
    IntegerType,
    LongType,
    StructType,
    StructField,
    StringType,
    TimestampType,
)
from sumy.nlp.stemmers import Stemmer
from sumy.nlp.tokenizers import Tokenizer
from sumy.parsers.plaintext import PlaintextParser
from sumy.summarizers.lsa import LsaSummarizer
from sumy.utils import get_stop_words

Initialize the Python notebook environment.

In [42]:
# A sentence tokenizer to split text into sentences
nltk.download("punkt_tab")

# Initialize a Spark session
# spark = SparkSession.builder.appName("Stock Forecasting").getOrCreate()
spark = (
    SparkSession.builder.appName("Stock Forecasting")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "2g")
    .config("spark.default.parallelism", "8")
    .getOrCreate()
)

# Stock symbol
STOCK_SYMBOL_UPPER = "MSFT"  # AA, AAPL, AMZN, MSFT, TSLA
STOCK_SYMBOL_LOWER = "msft"  # aa, aapl, amzn, msft, tsla

# File names
FILE_NAME_NEWS = f"news_{STOCK_SYMBOL_LOWER}.csv"
FILE_NAME_PRICE = f"price_{STOCK_SYMBOL_LOWER}.csv"
FILE_NAME_COMBINED = f"combined_{STOCK_SYMBOL_LOWER}.csv"

# File paths
FILE_PATH_EXTERNAL_NEWS = f"stock_news/external/external_{STOCK_SYMBOL_LOWER}.csv"
FILE_PATH_NASDAQ_NEWS = f"stock_news/nasdaq/nasdaq_{STOCK_SYMBOL_LOWER}.csv"
FILE_PATH_MERGED_NEWS = f"stock_news/merged/{FILE_NAME_NEWS}"
FILE_PATH_SUMMARIZED_NEWS = f"stock_news/summarized/{FILE_NAME_NEWS}"
FILE_PATH_SCORED_NEWS = f"stock_news/scored/{FILE_NAME_NEWS}"
FILE_PATH_DECAYED_NEWS = f"stock_news/decayed/{FILE_NAME_NEWS}"
FILE_PATH_SENTIMENT_SCORES = f"stock_news/scored/sentiments_{STOCK_SYMBOL_LOWER}.csv"
FILE_PATH_FULL_HISTORY_PRICE = f"stock_price/full_history/{STOCK_SYMBOL_UPPER}.csv"
FILE_PATH_PREPROCESSED_PRICE = f"stock_price/preprocessed/{FILE_NAME_PRICE}"

# Used by the Tokenizer
LANGUAGE = "english"  # Set the language

# Used by the LSA Summarizer
SENTENCES_COUNT = 3  # Set the max number of sentences in a summary

# Used by the LLM
# MODEL = "llama3.2:1b-instruct-fp16"
MODEL = "llama3.2:3b-instruct-fp16"  # Set the large-language model
BATCH_SIZE = 5  # Set the max number of sentences in a batch to be fed to the LLM
TEMPERATURE = 0.0  # Set the temperature to 0.0 for deterministic output
MAX_OUTPUT_TOKENS = 14  # Set the maximum number of output tokens

# Used by LLM for sentiment scoring
MIN_VALUE = 1  # The minimum value of the sentiment score
BASE_VALUE = 3  # The midpoint between 1 and 5 of the sentiment score
MAX_VALUE = 5  # The maximum value of the sentiment score

# Used by exponential decay algorithm
DECAY_RATE = 0.5  # Determines how quickly the sentiment decays over time

# Define the schema
FULL_HISTORY_PRICE_SCHEMA = StructType(
    [
        StructField("date", DateType(), True),
        StructField("open", DecimalType(24, 16), True),
        StructField("high", DecimalType(24, 16), True),
        StructField("low", DecimalType(24, 16), True),
        StructField("close", DecimalType(24, 16), True),
        StructField("adj close", DecimalType(24, 16), True),
        StructField("volume", LongType(), True),
    ]
)
EXTERNAL_NEWS_SCHEMA = StructType(
    [
        StructField("Date", TimestampType(), True),
        StructField("Article_title", StringType(), True),
        StructField("Stock_symbol", StringType(), True),
        StructField("Url", StringType(), True),
        StructField("Publisher", StringType(), True),
        StructField("Author", StringType(), True),
        StructField("Article", StringType(), True),
        StructField("Lsa_summary", StringType(), True),
        StructField("Luhn_summary", StringType(), True),
        StructField("Textrank_summary", StringType(), True),
        StructField("Lexrank_summary", StringType(), True),
    ]
)
NASDAQ_NEWS_SCHEMA = StructType(
    [
        StructField("Unnamed: 0", StringType(), True),
        StructField("Date", TimestampType(), True),
        StructField("Article_title", StringType(), True),
        StructField("Stock_symbol", StringType(), True),
        StructField("Url", StringType(), True),
        StructField("Publisher", StringType(), True),
        StructField("Author", StringType(), True),
        StructField("Article", StringType(), True),
        StructField("Lsa_summary", StringType(), True),
        StructField("Luhn_summary", StringType(), True),
        StructField("Textrank_summary", StringType(), True),
        StructField("Lexrank_summary", StringType(), True),
    ]
)
MERGED_NEWS_SCHEMA = StructType(
    [
        StructField("Date", TimestampType(), True),
        StructField("Article_title", StringType(), True),
        StructField("Stock_symbol", StringType(), True),
        StructField("Url", StringType(), True),
        StructField("Publisher", StringType(), True),
        StructField("Author", StringType(), True),
        StructField("Article", StringType(), True),
        StructField("Lsa_summary", StringType(), True),
        StructField("Summarized", IntegerType(), True),
        StructField("Sentiment_score", IntegerType(), True),
        StructField("UUID", StringType(), True),
    ]
)
SUMMARIZED_NEWS_SCHEMA = StructType(
    [
        StructField("Date", TimestampType(), True),
        StructField("Article_title", StringType(), True),
        StructField("Stock_symbol", StringType(), True),
        StructField("Url", StringType(), True),
        StructField("Publisher", StringType(), True),
        StructField("Author", StringType(), True),
        StructField("Article", StringType(), True),
        StructField("Lsa_summary", StringType(), True),
        StructField("Summarized", IntegerType(), True),
        StructField("Sentiment_score", IntegerType(), True),
        StructField("UUID", StringType(), True),
        StructField("Text", StringType(), True),
    ]
)
SCORED_NEWS_SCHEMA = StructType(
    [
        StructField("UUID", StringType(), True),
        StructField("Date", TimestampType(), True),
        StructField("Sentiment_score", IntegerType(), True),
        StructField("Lsa_summary", StringType(), True),
    ]
)
DECAYED_SENTIMENT_SCHEMA = StructType(
    [
        StructField("Date", DateType(), True),
        StructField("Sentiment_avg", DecimalType(18, 16), True),
        StructField("Last_valid_sentiment", DecimalType(18, 16), True),
        StructField("Last_valid_date", DateType(), True),
        StructField("Days_since_last_valid", IntegerType(), True),
        StructField("Decayed_sentiment", DecimalType(18, 16), True),
    ]
)
PREPROCESSED_PRICE_SCHEMA = StructType(
    [
        StructField("Date", DateType(), True),
        StructField("Open", DecimalType(24, 16), True),
        StructField("High", DecimalType(24, 16), True),
        StructField("Low", DecimalType(24, 16), True),
        StructField("Close", DecimalType(24, 16), True),
        StructField("Adj_close", DecimalType(24, 16), True),
        StructField("Volume", LongType(), True),
    ]
)

# Preprocess the Price Dataset

Ingest the price dataset.

In [None]:
# Read price data from CSV files to Spark dataframes
df_price = spark.read.csv(
    FILE_PATH_FULL_HISTORY_PRICE, header=True, schema=FULL_HISTORY_PRICE_SCHEMA
)

# Store in memory
df_price.persist()

# Verify
print(f"Row count for df_price: {df_price.count()}")
df_price.show(5, truncate=True)

Preprocess the price dataset.

In [None]:
# Rename the headers
df_price = (
    df_price.withColumnRenamed("date", "Date")
    .withColumnRenamed("open", "Open")
    .withColumnRenamed("high", "High")
    .withColumnRenamed("low", "Low")
    .withColumnRenamed("close", "Close")
    .withColumnRenamed("adj close", "Adj_close")
    .withColumnRenamed("volume", "Volume")
    .orderBy("Date")
)

# Verify
print(f"Row count for df_price: {df_price.count()}")
df_price.show(5, truncate=True)

Save a copy of the dataset.

In [5]:
# Specify directories
FOLDER_PRICE = "stock_price/preprocessed"
TEMP_FOLDER_PRICE = f"stock_price/preprocessed/price_{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_price.coalesce(1).write.csv(
    TEMP_FOLDER_PRICE, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_PRICE):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_PRICE, filename),
            os.path.join(FOLDER_PRICE, FILE_NAME_PRICE),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_PRICE)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_price.unpersist(blocking=False)

local_variables = [
    "df_price",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]

# Preprocess the News Datasets

Read in the external news and Nasdaq news datasets from CSV files to Spark dataframes.

In [None]:
# Read news data from CSV files to Spark dataframes
df_external_news = spark.read.csv(
    FILE_PATH_EXTERNAL_NEWS, header=True, schema=EXTERNAL_NEWS_SCHEMA
)
df_nasdaq_news = spark.read.csv(
    FILE_PATH_NASDAQ_NEWS, header=True, schema=NASDAQ_NEWS_SCHEMA
)

# Store in memory
df_external_news.persist()
df_nasdaq_news.persist()

# Verify the dataframes
print(f"Row count for df_external_news: {df_external_news.count()}")
df_external_news.show(5, truncate=True)

print(f"Row count for df_nasdaq_news: {df_nasdaq_news.count()}")
df_nasdaq_news.show(5, truncate=True)

Drop unused fields and merge the external and Nasdaq news datasets into one dataframe.

In [None]:
# Drop unused fields
df_external_news = df_external_news.drop(
    "Luhn_summary", "Textrank_summary", "Lexrank_summary"
)
df_nasdaq_news = df_nasdaq_news.drop(
    "Unnamed: 0", "Luhn_summary", "Textrank_summary", "Lexrank_summary"
)

# Merge two dataframes
df_news = df_nasdaq_news.unionByName(df_external_news)

# Store in memory
df_news.persist()

# Verify
# Expect count is 7419, where 2945 + 4474 = 7419
print(f"Row count for df_news: {df_news.count()}")
df_news.show(5)

Preprocess the merged news dataset.

In [None]:
# Standardize the timestamps to UTC timezone. Example: Convert "2019-01-15 00:00:00 UTC" to "2019-01-15 00:00:00".
df_news = df_news.withColumn(
    "Date", F.to_utc_timestamp(F.to_timestamp("Date", "yyyy-MM-dd HH:mm:ss zzz"), "UTC")
).filter(F.col("Date").isNotNull())

# Add a "Summarized" field with all values set to 0.
# Add a "Sentiment_score" field with all values set to 0.
# Sort by Date field in descending order.
df_news = (
    df_news.withColumn("Summarized", F.lit(0))
    .withColumn("Sentiment_score", F.lit(0))
    .orderBy("Date", ascending=False)
)

# Add a unique identifier field
uuid_udf = F.udf(lambda: str(uuid.uuid4()), StringType())
df_news = df_news.withColumn("UUID", uuid_udf())

# Verify
print(f"Row count for df_news: {df_news.count()}")
df_news.show(5, truncate=True)

Save a copy of the preprocessed merged news dataset.

In [9]:
# Specify directories
FOLDER_MERGED_NEWS = "stock_news/merged"
TEMP_FOLDER_MERGED_NEWS = f"stock_news/merged/news_{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_news.coalesce(1).write.csv(
    TEMP_FOLDER_MERGED_NEWS, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_MERGED_NEWS):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_MERGED_NEWS, filename),
            os.path.join(FOLDER_MERGED_NEWS, FILE_NAME_NEWS),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_MERGED_NEWS)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_external_news.unpersist(blocking=False)
df_nasdaq_news.unpersist(blocking=False)
df_news.unpersist(blocking=False)

local_variables = [
    "df_external_news",
    "df_nasdaq_news",
    "df_news",
    "uuid_udf",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]

# Summarize the News Dataset

Retrieve the merged news dataset.

In [None]:
# Read the CSV file to Spark dataframe
df_merged_news = spark.read.csv(
    FILE_PATH_MERGED_NEWS, header=True, schema=MERGED_NEWS_SCHEMA
)

# Store in memory
df_merged_news.persist()

# Verify
print(f"Row count for df_merged_news: {df_merged_news.count()}")
df_merged_news.show(5, truncate=True)

Summarize the news texts using a LSA Summarizer.

In [11]:
"""
Description:
Takes the article text as input, parses it using PlaintextParser, and summarizes it using LsaSummarizer.

Parameters:
text (string): The news article.

Returns:
string: The summarized text.
"""


def summarize_article(text):
    parser = PlaintextParser.from_string(text, Tokenizer(LANGUAGE))

    # Stemming reduces words to their root form for the summarizer to identify similar concepts expressed with
    # different word forms.
    stemmer = Stemmer(LANGUAGE)

    # Initializes the summarizer with the stemmer
    summarizer = LsaSummarizer(stemmer)

    # Removes stop word to eliminate common non-keyword words.
    summarizer.stop_words = get_stop_words(LANGUAGE)

    # Generates the summarized text
    summary = summarizer(parser.document, SENTENCES_COUNT)
    return " ".join([str(sentence) for sentence in summary])


# A user-designed function to wrap the summarize_article function to be used in Spark.
summarize_udf = F.udf(summarize_article, StringType())

# Concats both fields with a period and space as the separator.
# Execute the summarize_udf function
df_merged_news = df_merged_news.withColumn(
    "Text", F.concat_ws(". ", "Article_title", "Article")
).withColumn("Lsa_summary", summarize_udf("Text"))

# Updated summarized indicator
df_merged_news = df_merged_news.withColumn(
    "Summarized",
    F.when(
        F.col("Lsa_summary").isNotNull() & (F.col("Lsa_summary") != ""), F.lit(1)
    ).otherwise(0),
)

# Verify number of news with no summary
# print(
#     f"Number of news with no summary: {df_merged_news.filter(col("Summarized") == 0).count()}"
# )

# Remove rows without a summary
df_merged_news = df_merged_news.filter((F.col("Summarized") == 1))

Verify the results of the summarization.

In [None]:
# Verify
print(f"Row count for df_merged_news: {df_merged_news.count()}")

df_merged_news.show(5, truncate=True)

df_merged_news.select(
    "Date",
    "Summarized",
    "Text",
    "Lsa_summary",
).show(10, truncate=False)

Save a copy of the summarized news dataset.

In [None]:
# Specify the directories
FOLDER_SUMMARIZED_NEWS = "stock_news/summarized"
TEMP_FOLDER_SUMMARIZED_NEWS = "stock_news/summarized/news_{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_merged_news.coalesce(1).write.csv(
    TEMP_FOLDER_SUMMARIZED_NEWS, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_SUMMARIZED_NEWS):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_SUMMARIZED_NEWS, filename),
            os.path.join(FOLDER_SUMMARIZED_NEWS, FILE_NAME_NEWS),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_SUMMARIZED_NEWS)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_merged_news.unpersist(blocking=False)

local_variables = [
    "df_merged_news",
    "summarize_udf",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]

# Calculate the Sentiment Score of the News

Retrieve the summarized news dataset.

In [None]:
# Read the CSV file to Spark dataframe
df_summarized_news = spark.read.csv(
    FILE_PATH_SUMMARIZED_NEWS, header=True, schema=SUMMARIZED_NEWS_SCHEMA
)

# Store in memory
df_summarized_news.persist()

# Verify
print(f"Row count for df_summarized_news: {df_summarized_news.count()}")
df_summarized_news.show(5, truncate=True)

Retrieve the summaries and initialize the large-language model.

In [None]:
# Get the UUID, Date, Url, and Lsa_summary from the first row
collection = df_summarized_news.select(
    F.collect_list("UUID"),
    F.collect_list("Date"),
    F.collect_list("Lsa_summary"),
).first()
uuids = collection[0]
dates = collection[1]
lsa_summaries = collection[2]

# Verify
print(f"Number of uuids: {len(uuids)}")
print(f"Number of dates: {len(dates)}")
print(f"Number of lsa_summaries: {len(lsa_summaries)}")

# Initializing the OllamaLLM
try:
    llm = ChatOllama(
        model=MODEL, temperature=TEMPERATURE, num_predict=MAX_OUTPUT_TOKENS
    )
    print("ChatOllama instance created successfully!")
except Exception as e:
    print("Error creating ChatOllama instance:", e)

Feed the summmmaries to the model in batches and capture the resulting sentiment score.

In [None]:
# Holds the resulting output, i.e. the sentiment scores
sentiments = []

# Iterate in batches
for i in range(0, len(lsa_summaries), BATCH_SIZE):
    batch_summaries = lsa_summaries[i : i + BATCH_SIZE]
    batch_uuids = uuids[i : i + BATCH_SIZE]

    num_text = len(batch_summaries)
    # print(f"#{i}, num_text: {num_text}")

    batch_text = " ".join(
        [
            f"### {STOCK_SYMBOL_UPPER} Stock News: {summary} "
            for summary in batch_summaries
        ]
    )
    # print(f"#{i}, batch_text: {batch_text}")

    chat_template = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                f"Forget all previous instructions. You are a financial expert with stock recommendation experience. Based on news for a specific stock, provide a sentiment score in the range of 1 to 5 inclusive, where 1 is negative, 2 is somewhat negative, 3 is neutral, 4 is somewhat positive, and 5 is positive. {num_text} summerized news will be passed in each time. You will provide {num_text} scores, one score for each of the summerized news in the format as shown below in the response from the assistant.",
            ),
            (
                "user",
                f"### AAPL Stock News: Apple (AAPL) increased 22%. ### AAPL Stock News: Apple (AAPL) price decreased 30%. ### MSFT Stock News: Microsoft (MSTF) price has not changed. ### AAPL Stock News: Apple (AAPL) announced the new iPhone 15. ### AAPL Stock News: Apple (AAPL) will release the Vison Pro on Feb 2, 2024.",
            ),
            ("ai", "5, 1, 3, 4, 4"),
            ("user", batch_text),
        ]
    )

    messages = chat_template.format_messages(num_text=num_text)
    # print(f"#{i}, messages: {messages}")

    response = llm.invoke(messages)
    print(f"#{i}: {response.content}")

    # Loop through each batch and append the sentiment scores to the list
    for sentiment in response.content.split(","):
        stripped_sentiment = sentiment.strip()
        try:
            if stripped_sentiment:  # Check if the stripped sentiment is not empty
                # Round the sentiment and clamp it between 1 and 5
                rounded_sentiment = max(
                    MIN_VALUE, min(MAX_VALUE, round(float(stripped_sentiment)))
                )
                sentiments.append(rounded_sentiment)
        except ValueError:
            print(f"Invalid sentiment value: {stripped_sentiment}")
        # print(f"#{i}, sentiment: {sentiment}")

In [20]:
# Save the sentiments to a file
with open(FILE_PATH_SENTIMENT_SCORES, "w") as f:
    f.writelines(f"{sentiment}\n" for sentiment in sentiments)

In [None]:
# Verify
print(f"Number of lsa_summaries: {len(lsa_summaries)}")
print(f"Number of sentiments: {len(sentiments)}")
print(f"Row count for df_summarized_news: {df_summarized_news.count()}")

In [None]:
# TO BE EXECUTED ONLY IF THE NUMBER OF SENTIMENTS DOES NOT MATCH THE NUMBER OF SUMMARIES
# Step 1: Re-run the sentiment score for the affected batch
# Step 2: Update the missing scores to the sentiments.csv file
# Step 3: Verify the number of sentiments in the sentiments.csv file matches the number of summaries
# Step 4: Execute code below to copy out the sentiments from sentiments.csv file to the sentiments list

sentiment = []
with open(FILE_PATH_SENTIMENT_SCORES, "r") as f:
    sentiments = [int(sentiment.strip()) for sentiment in f if sentiment.strip()]
print(f"Number of sentiments: {len(sentiments)}")

Save a copy of the sentiment scored news dataset.

In [None]:
if len(sentiments) != len(lsa_summaries):
    raise ValueError("The number of sentiments does not match the number of summaries.")

# Combine lists into a list of tuples
scored_news = list(zip(uuids, dates, sentiments, lsa_summaries))

# Create a dataframe with the specified schema
df_scored_news = spark.createDataFrame(scored_news, schema=SCORED_NEWS_SCHEMA)

# Specify the directories
FOLDER_SCORED_NEWS = "stock_news/scored"
TEMP_FOLDER_SCORED_NEWS = "stock_news/scored/news_{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_scored_news.coalesce(1).write.csv(
    TEMP_FOLDER_SCORED_NEWS, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_SCORED_NEWS):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_SCORED_NEWS, filename),
            os.path.join(FOLDER_SCORED_NEWS, FILE_NAME_NEWS),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_SCORED_NEWS)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_summarized_news.unpersist(blocking=False)

local_variables = [
    "collection",
    "uuids",
    "dates",
    "lsa_summaries",
    "sentiments",
    "llm",
    "chat_template",
    "messages",
    "response",
    "scored_news",
    "df_summarized_news",
    "df_scored_news",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]

# Exponentially Decay the Sentiment Score of the News

Read in the sentiment scored stock news.

In [None]:
df_scored_news = spark.read.csv(
    FILE_PATH_SCORED_NEWS, header=True, schema=SCORED_NEWS_SCHEMA
)

# Verify
print(f"Row count for df_scored_news: {df_scored_news.count()}")
df_scored_news.show(5, truncate=True)

Calculate the average sentiment scores for each date. Then, populate the missing dates.

In [None]:
# Extracting only the date component and sort by date
df_scored_news_copy = df_scored_news.select(
    "UUID", F.to_date("Date").alias("Date"), "Sentiment_score", "Lsa_summary"
).orderBy("Date")

# Group by "Date" and calculate the average "Sentiment_score"
df_avg_sentiment = df_scored_news_copy.groupBy("Date").agg(
    F.avg("Sentiment_score").alias("Sentiment_avg")
)

# Retrieve the start and end date
start_date = df_avg_sentiment.agg(F.min("Date")).collect()[0][0]
end_date = df_avg_sentiment.agg(F.max("Date")).collect()[0][0]

# Initialize a dataframe with the start and end dates
df_date_range = spark.createDataFrame(
    [(start_date, end_date)], ["start_date", "end_date"]
)

# Generate a date range using sequence
df_dates = df_date_range.select(
    F.expr("sequence(to_date(start_date), to_date(end_date), interval 1 day)").alias(
        "Date"
    )
)

# Explode the array to get individual dates
df_dates = df_dates.select(F.explode("Date").alias("Date"))

# Join the complete date range with the original dataframe
df_avg_sentiment_filled = df_dates.join(df_avg_sentiment, on="Date", how="left")

# Store in memory
df_avg_sentiment_filled.persist()

# Verify
print(f"Row count for df_avg_sentiment_filled: {df_avg_sentiment_filled.count()}")
df_avg_sentiment_filled.show(df_avg_sentiment_filled.count(), truncate=False)

Apply an exponential decay algorithm to the missing average sentiment scores.

In [None]:
# Create a window specification to get the last valid sentiment and last valid date
window_spec = Window.orderBy("Date").rowsBetween(
    Window.unboundedPreceding, Window.currentRow
)

# Get last valid sentiment
df_avg_sentiment_filled = df_avg_sentiment_filled.withColumn(
    "Last_valid_sentiment", F.last("Sentiment_avg", ignorenulls=True).over(window_spec)
)

# Get last valid date only when sentiment is not null
df_avg_sentiment_filled = df_avg_sentiment_filled.withColumn(
    "Last_valid_date",
    F.last(
        F.when(F.col("Sentiment_avg").isNotNull(), F.col("Date")), ignorenulls=True
    ).over(window_spec),
)

# Calculate the number of days since the last valid sentiment
df_avg_sentiment_filled = df_avg_sentiment_filled.withColumn(
    "Days_since_last_valid", (F.datediff(F.col("Date"), F.col("Last_valid_date")))
)

# Calculate decayed sentiment for rows where the average sentiment is null
df_avg_sentiment_filled = df_avg_sentiment_filled.withColumn(
    "Decayed_sentiment",
    F.when(
        F.col("Sentiment_avg").isNull(),
        BASE_VALUE
        + (F.col("Last_valid_sentiment") - BASE_VALUE)
        * F.exp(-DECAY_RATE * F.col("Days_since_last_valid")),
    ).otherwise(F.col("Sentiment_avg")),
)

# Verify
print(f"Row count for df_avg_sentiment_filled: {df_avg_sentiment_filled.count()}")
df_avg_sentiment_filled.show(df_avg_sentiment_filled.count(), truncate=False)

Save a copy of the dataset.

In [None]:
# Specify the directories
FOLDER_DECAYED_NEWS = "stock_news/decayed"
TEMP_FOLDER_DECAYED_NEWS = "stock_news/decayed/news_{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_avg_sentiment_filled.coalesce(1).write.csv(
    TEMP_FOLDER_DECAYED_NEWS, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_DECAYED_NEWS):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_DECAYED_NEWS, filename),
            os.path.join(FOLDER_DECAYED_NEWS, FILE_NAME_NEWS),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_DECAYED_NEWS)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_avg_sentiment_filled.unpersist(blocking=False)

local_variables = [
    "window_spec",
    "start_date",
    "end_date",
    "df_scored_news",
    "df_scored_news_copy",
    "df_avg_sentiment",
    "df_date_range",
    "df_dates",
    "df_avg_sentiment_filled",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]

# Integrate the News and Price Datasets

Ingest the decayed sentiment dataset and the price dataset.

In [None]:
# Read price data from CSV files to Spark dataframes
df_decayed_sentiment = spark.read.csv(
    FILE_PATH_DECAYED_NEWS, header=True, schema=DECAYED_SENTIMENT_SCHEMA
)
df_price = spark.read.csv(
    FILE_PATH_PREPROCESSED_PRICE, header=True, schema=PREPROCESSED_PRICE_SCHEMA
)

# Verify
print(f"Row count for df_decayed_sentiment: {df_decayed_sentiment.count()}")
df_decayed_sentiment.show(5, truncate=True)
print(f"Row count for df_price: {df_price.count()}")
df_price.show(5, truncate=True)

Merge the price dataset with the average sentiment score taken from news dataset.

In [None]:
# Drop unused fields
df_decayed_sentiment = df_decayed_sentiment.drop(
    "Sentiment_avg", "Last_valid_sentiment", "Last_valid_date", "Days_since_last_valid"
)

# Join the price data with the average sentiment data
df_combined = df_price.join(df_decayed_sentiment, on="Date", how="left")

# Store in memory
df_combined.persist()

# Convert the decayed sentiment values to 3 if they are null
df_combined = df_combined.withColumn(
    "Decayed_sentiment",
    F.when(
        F.col("Decayed_sentiment").isNull() | (F.col("Decayed_sentiment") == ""),
        BASE_VALUE,
    ).otherwise(F.col("Decayed_sentiment")),
)

# Verify
df_combined.show(20, truncate=False)

Normalize the sentiments to between 0 and 1.

In [None]:
# Normalize the Decayed_sentiment column
df_combined = df_combined.withColumn(
    "Normalized_sentiment",
    (F.col("Decayed_sentiment") - MIN_VALUE) / (MAX_VALUE - MIN_VALUE),
)

# Verify
df_combined.show(20, truncate=False)

Save a copy of the dataset.

In [52]:
# Specify the directories
FOLDER_COMBINED = "stock_combined"
TEMP_FOLDER_COMBINED = "stock_combined/{STOCK_SYMBOL_LOWER}"

# Write to a single CSV file
df_combined.coalesce(1).write.csv(
    TEMP_FOLDER_COMBINED, sep=",", header=True, mode="overwrite"
)

# Move the part file to the desired filename
for filename in os.listdir(TEMP_FOLDER_COMBINED):
    if filename.startswith("part-"):
        shutil.move(
            os.path.join(TEMP_FOLDER_COMBINED, filename),
            os.path.join(FOLDER_COMBINED, FILE_NAME_COMBINED),
        )

# Remove the temporary directory
shutil.rmtree(TEMP_FOLDER_COMBINED)

# Uncache the dataframe in a non-blocking operation to free up memory.
df_combined.unpersist(blocking=False)

local_variables = [
    "df_decayed_sentiment",
    "df_price",
    "df_combined",
]

# Delete local variables if they exist
for var in local_variables:
    if var in locals():  # Check if the variable exists in the local scope
        del locals()[var]