<h1 align="center" style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">MSc in Data Analytics: Big Data Storage and Processing</h1>

### Table of Contents

- [Introduction](#Introduction)
    - [Assessment Overview](#Assessment-Overview)
    - [Project Summary](#Project-Summary)

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Introduction</h2>

### **Assessment Overview**

### Project Summary

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Install and Import Required Libraries</h2>

In [1]:
# !pip install -q pyspark pymongo

In [2]:
import glob
import os
from datetime import datetime
import logging

import numpy as np
import pandas as pd

import pymongo
from pymongo import MongoClient


from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import lit

from kaggle_secrets import UserSecretsClient

import warnings

In [3]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s — %(levelname)s %(message)s', force=True)
logger = logging.getLogger(__name__)

# Disable warnings
warnings.filterwarnings(action='ignore')

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Define Data Paths</h2>

In [4]:
STOCKPRICE_FOLDER = "/kaggle/input/stock-tweet-and-price/stock-tweet-and-price/stockprice"
STOCKTWEET_CSV = "/kaggle/input/stock-tweet-and-price/stock-tweet-and-price/stocktweet/stocktweet.csv"

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Set MongoDB Connection</h2>

In [5]:
user_secrets = UserSecretsClient()
mongodb_uri = user_secrets.get_secret("mongodb-atlas-uri")

In [6]:
def create_mongodb_connection(uri, db_name='stock_analytics'):
    """
    Connect to MongoDB and return database instance
    
    Args:
        uri (str): Database URI
        db_name (str): Database name
    
    Returns:
        pymongo.database.Database: MongoDB database instance
    """
    try:
        client = MongoClient(uri)
        db = client[db_name]
        print("MongoDB version:", client.server_info()["version"])
        logger.info(f"Connected to MongoDB database: {db_name}")
        return db
    except Exception as e:
        logger.error(f"Failed to connect to MongoDB: {e}")
        raise

db = create_mongodb_connection(mongodb_uri, 'stock_analytics')

2025-07-14 18:39:34,449 — INFO Connected to MongoDB database: stock_analytics


MongoDB version: 8.0.11


### MongoDB Selection Rationale

**Why MongoDB was chosen over other NoSQL options:**


* **Document-oriented structure**

  * MongoDB’s JSON-like document model naturally accommodates the semi-structured `stocktweet.csv` data (e.g., tweet text, ticker, and timestamp).
  * No need for complex schema definitions or rigid table structures.

* **Ease of use and developer-friendliness**
  * It is easy to get started with MongoDB. The installation process is easy as well as connecting to the database
  * MongoDB’s query language (MQL) is intuitive and similar to JSON syntax, which makes it easy to write.
  * Rich developer tools such as MongoDB Compass which helps me visualize the database. E.g when I have created a collection, inserted a data, and so on.
  * Extensive support for multiple programming languages (e.g., Python, Java, Scala).

* **Seamless integration with Apache Spark**

  * The `mongo-spark-connector` enables direct read/write access between MongoDB and Spark DataFrames.
  * Simplifies data ingestion, distributed transformation, and analytics within the Spark environment.

* **Efficient for read-heavy analytical workloads**

  * MongoDB supports secondary indexing and text search—ideal for querying tweets by ticker symbol or date.
  * Aggregation pipelines allow for efficient summarization and filtering of large datasets.

* **Cloud accessibility and scalability**

  * MongoDB Atlas provides a fully managed cloud database service that allows me to connect securely from anywhere.
  * I can easily deploy and scale MongoDB clusters in the cloud and integrate them with my local Spark environment.
  * Atlas ensures high availability, backup, and monitoring, which is ideal for handling large-scale, real-time tweet and stock data.

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Create Spark Session</h2>

**Note:** The original MongoDB URI had the format `<mongodb+srv://<user>:<password>@cluster.mongodb.net/?...>`, which lacks a target database. Since Spark requires a database to be explicitly defined in the connection URI, the string was programmatically updated to include the `/stock_analytics` path before the query string.

In [7]:
# Insert 'stock_analytics' before the query string
spark_mongodb_uri = mongodb_uri.replace(".net/", ".net/stock_analytics?")

In [8]:
def create_spark_session(app_name="StockAnalytics"):
    """
    Create Spark session with MongoDB connector
    
    Args:
        app_name (str): Spark application name
    
    Returns:
        pyspark.sql.SparkSession: Spark session instance
    """
    try:
        spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
            .config("spark.mongodb.input.uri", spark_mongodb_uri) \
            .config("spark.mongodb.output.uri", spark_mongodb_uri) \
            .getOrCreate()
        
        logger.info(f"Spark session created: {app_name}")
        return spark
    except Exception as e:
        logger.error(f"Failed to create Spark session: {e}")
        raise

In [9]:
spark = create_spark_session()

# Suppress Warnings in PySpark
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e3ef89fe-374c-4de1-a492-75380e04545c;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 317ms :: artifacts dl 18ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       

### Spark Selection Rationale

**Why Spark instead of Hadoop MapReduce:**

* Spark performs in-memory processing, making it much faster than Hadoop MapReduce—especially for repeated operations like filtering, joining, and aggregation.
* It provides simpler, higher-level APIs (e.g., DataFrames and SQL) that are easier to use and more readable than the verbose MapReduce code.
* Spark includes powerful built-in libraries for SQL, machine learning, and streaming, which are not natively available in MapReduce.

**Why PySpark:**

* PySpark allows me to use Python—making it easier to integrate with pandas, matplotlib, and other familiar libraries.
* Its syntax is concise and readable, improving development speed and code clarity.
* Strong community support and documentation simplify implementation and troubleshooting.

**MongoDB Integration:**

* The Spark session is configured with the MongoDB connector, allowing direct read/write access between Spark and MongoDB.
* This setup creates a seamless pipeline from storage to distributed processing without intermediate conversions.

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Store Source Datasets into NoSQL Database using Spark</h2>

In [10]:
# Clear existing data
for collection_name in db.list_collection_names():
    db[collection_name].drop()

In [11]:
def store_stock_tweets_to_mongodb_spark(spark, csv_path):
    """
    Load stock tweet data from CSV to MongoDB using Spark
    
    Args:
        spark (SparkSession): Spark session instance
        csv_path (str): Path to stocktweet.csv file
    """
    try:
        df = spark.read.option("header", True).csv(csv_path)
        df.write.format("mongo") \
            .mode("overwrite") \
            .option("collection", "stock_tweets") \
            .save()
        
        logger.info(f"Loaded stock tweets into MongoDB using Spark")
    except Exception as e:
        logger.error(f"Failed to load stock tweets via Spark: {e}")
        raise

store_stock_tweets_to_mongodb_spark(spark, STOCKTWEET_CSV)

2025-07-14 18:40:33,264 — INFO Loaded stock tweets into MongoDB using Spark     


In [12]:
def store_stock_prices_to_mongodb_spark(spark, stockprice_folder):
    """
    Load stock price data from multiple CSVs to MongoDB using Spark
    
    Args:
        spark (SparkSession): Spark session instance
        stockprice_folder (str): Path to folder containing stock price CSV files
    """
    try:
        # Load all CSVs from the folder
        df = spark.read.option("header", True).csv(f"{stockprice_folder}/*.csv")
        df.write.format("mongo") \
            .mode("overwrite") \
            .option("collection", "stock_prices") \
            .save()

        logger.info(f"Loaded stock prices into MongoDB using Spark")
    except Exception as e:
        logger.error(f"Failed to load stock prices via Spark: {e}")
        raise

store_stock_prices_to_mongodb_spark(spark, STOCKPRICE_FOLDER)

2025-07-14 18:40:46,366 — INFO Loaded stock prices into MongoDB using Spark     


### Rationale for Using Spark to Populate MongoDB

The source datasets (`stocktweet.csv` and stock price files) were loaded using **PySpark**, and written directly into a MongoDB NoSQL database using the **MongoDB Spark Connector**. This approach leverages Spark’s distributed capabilities to handle large data volumes efficiently and meets the requirement to populate the NoSQL database using a big data processing tool. Each dataset was written to a separate collection (`stock_tweets`, `stock_prices`) for streamlined querying and integration with further Spark-based analytics.

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Read Source Datasets into NoSQL Database using Spark</h2>

In [13]:
def read_tweets_from_mongodb(spark):
    """
    Read tweet data from MongoDB using Spark
    
    Args:
        spark: Spark session instance
    
    Returns:
        pyspark.sql.DataFrame: Spark DataFrame containing tweet data
    """
    try:
        df = spark.read \
            .format("mongo") \
            .option("collection", "stock_tweets") \
            .load()
        
        logger.info(f"Read {df.count()} tweets from MongoDB")
        return df
        
    except Exception as e:
        logger.error(f"Failed to read tweets from MongoDB: {e}")
        raise

tweets_df = read_tweets_from_mongodb(spark)

2025-07-14 18:40:49,819 — INFO Read 11149 tweets from MongoDB                   


In [14]:
def read_prices_from_mongodb(spark):
    """
    Read stock price data from MongoDB using Spark
    
    Args:
        spark: Spark session instance
    
    Returns:
        pyspark.sql.DataFrame: Spark DataFrame containing price data
    """
    try:
        df = spark.read \
            .format("mongo") \
            .option("collection", "stock_prices") \
            .load()
        
        logger.info(f"Read {df.count()} price records from MongoDB")
        return df
        
    except Exception as e:
        logger.error(f"Failed to read prices from MongoDB: {e}")
        raise

prices_df = read_prices_from_mongodb(spark)

2025-07-14 18:40:51,809 — INFO Read 10175 price records from MongoDB


### Rationale for Using Spark to Read from MongoDB

**Data Retrieval (Read):**
The `read_tweets_from_mongodb()` and `read_prices_from_mongodb()` functions use Spark’s MongoDB connector to directly query collections into Spark DataFrames. This enables seamless integration for downstream analytics, leveraging Spark’s speed and MongoDB’s flexible document storage.

**Why this matters:**
Using Spark for both reading and writing ensures a **unified, high-performance pipeline** between storage (MongoDB) and processing (Spark), ideal for handling large-scale tweet and stock data.

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Process tweet sentiment</h2>

In [15]:
positive_keywords = [
    "up", "bull", "bullish", "gain", "gains", "rise", "rising", "high", "higher", "buy", "strong", 
    "positive", "green", "profit", "profits", "surge", "surging", "beat", "beats", "boom", "breakout",
    "support", "soar", "soaring", "rocket", "growth", "rebound", "optimistic", "win", "wins", "recovery"
]

negative_keywords = [
    "down", "bear", "bearish", "loss", "losses", "fall", "falling", "low", "lower", "sell", "selling", 
    "weak", "negative", "red", "drop", "dropped", "dip", "dipping", "miss", "missed", "crash", "plunge",
    "resistance", "collapse", "tank", "tanking", "uncertain", "volatile", "recession", "fear", "panic"
]

In [16]:
def process_tweet_sentiment_analysis(tweets_df, positive_keywords, negative_keywords):
    """
    Process tweets for basic sentiment analysis and aggregation
    
    Args:
        tweets_df (pyspark.sql.DataFrame): Spark DataFrame containing tweet data
    
    Returns:
        pyspark.sql.DataFrame: Processed tweet data with sentiment indicators
    """
    try:
        # Ensure date column is in proper format
        tweets_df = tweets_df.withColumn("date", to_date(col("date"), "MM/dd/yyyy"))
    
        # Normalize tweet text
        tweets_df = tweets_df.withColumn("tweet_clean", lower(regexp_replace(col("tweet"), r"[^a-zA-Z\s]", "")))
    
        # Use regex for case-insensitive keyword matching
        pos_regex = r"\b(" + "|".join(positive_keywords) + r")\b"
        neg_regex = r"\b(" + "|".join(negative_keywords) + r")\b"
    
        # Compute sentiment scores
        tweets_df = tweets_df.withColumn("positive_sentiment", when(col("tweet_clean").rlike(pos_regex), 1).otherwise(0))
        tweets_df = tweets_df.withColumn("negative_sentiment", when(col("tweet_clean").rlike(neg_regex), 1).otherwise(0))
        tweets_df = tweets_df.withColumn("net_sentiment", col("positive_sentiment") - col("negative_sentiment"))
    
        # Sentiment label for readability
        tweets_df = tweets_df.withColumn(
            "sentiment_label",
            when(col("net_sentiment") > 0, "positive")
            .when(col("net_sentiment") < 0, "negative")
            .otherwise("neutral")
        )
    
        # Drop temporary columns
        tweets_df = tweets_df.drop("tweet_clean")
    
        logger.info("Enhanced tweet sentiment analysis completed")
        return tweets_df

    except Exception as e:
        logger.error(f"Failed to process tweet sentiment: {e}")
        raise

In [17]:
logger.info("Processing tweet sentiment analysis...")
processed_tweets = process_tweet_sentiment_analysis(tweets_df, positive_keywords, negative_keywords)

2025-07-14 18:40:51,891 — INFO Processing tweet sentiment analysis...
2025-07-14 18:40:52,128 — INFO Enhanced tweet sentiment analysis completed


In [18]:
processed_tweets.show(3)

+--------------------+----------+------+------+--------------------+------------------+------------------+-------------+---------------+
|                 _id|      date|    id|ticker|               tweet|positive_sentiment|negative_sentiment|net_sentiment|sentiment_label|
+--------------------+----------+------+------+--------------------+------------------+------------------+-------------+---------------+
|{68754ef506c5f159...|2020-01-01|100001|  AMZN|$AMZN Dow futures...|                 1|                 0|            1|       positive|
|{68754ef506c5f159...|2020-01-01|100002|  TSLA|$TSLA Daddy's dri...|                 0|                 0|            0|        neutral|
|{68754ef506c5f159...|2020-01-01|100003|  AAPL|$AAPL We’ll been ...|                 0|                 0|            0|        neutral|
+--------------------+----------+------+------+--------------------+------------------+------------------+-------------+---------------+
only showing top 3 rows



### Spark-Based Tweet Sentiment Processing: Rationale and Justification

To extract actionable insights from unstructured social media content, we implemented a **basic rule-based sentiment analysis pipeline** using **PySpark**, designed to classify tweets related to stock tickers as positive, negative, or neutral. This approach leverages the **distributed computing power of Apache Spark** to handle large-scale textual data efficiently and integrates seamlessly with MongoDB for scalable read-write operations.

#### 1. **Why Spark for Sentiment Processing?**

* **Scalability and Distributed Execution**: Given the volume of tweet data (>10,000 records), Spark's distributed in-memory processing is optimal for parallelizing transformations such as string cleaning, keyword matching, and column derivation without bottlenecks.
* **SQL-like DataFrame API**: Spark DataFrames offer a high-level abstraction that allows expressive, readable, and efficient transformations, well-suited for text pre-processing and sentiment scoring logic.
* **Seamless MongoDB Integration**: Since data was stored in MongoDB, Spark's native Mongo connector allows efficient data ingestion and storage without requiring intermediate formats.

#### 2. **Justification for Rule-Based Sentiment Analysis**

While more complex models like deep learning classifiers could be used, the project prioritizes simplicity and runtime efficiency within a Spark pipeline. A **dictionary-based sentiment approach** using predefined `positive_keywords` and `negative_keywords` allows for:

* **Fast pattern matching** using Spark’s `rlike()` method and regular expressions.
* **Lightweight processing** suited to the project’s scale and distributed context.
* **Transparent results** where each tweet’s label (positive/negative/neutral) can be traced back to keyword presence.

#### 3. **Key Design Choices**

* **Text Normalization**: Special characters were removed and tweet content was lowercased using `regexp_replace()` and `lower()` to ensure consistent pattern matching.
* **Regex-based Classification**: The method supports partial word matches and avoids false positives through word boundary anchors (`\b`) in the regex pattern.
* **Sentiment Scoring Logic**:

  * `positive_sentiment` and `negative_sentiment` are binary indicators.
  * `net_sentiment` provides a numeric sentiment score.
  * `sentiment_label` provides a human-readable category for downstream use (e.g., grouping, visualization).
* **Date Conversion**: The `date` column is parsed using `to_date()` to enable accurate time-series grouping or trend analysis.

#### 4. **Alignment with Big Data Objectives**

This implementation supports **real-time data processing scenarios**, as it can be adapted to Spark Streaming. It also supports **follow-up analytical steps** like joining with stock price data on the `date` and `ticker` fields for correlation analysis or event-driven stock movement prediction.

<h2 style="background-color:#2c3e54;color:#ecf0f1;border-radius: 8px; padding:15px">Process stock price metrics</h2>

In [19]:
def process_stock_price_metrics(prices_df):
    """
    Process stock price data to calculate key metrics
    
    Args:
        prices_df: Spark DataFrame containing price data
    
    Returns:
        pyspark.sql.DataFrame: Processed price data with calculated metrics
    """
    try:
        # Convert date string to proper date format
        prices_df = prices_df.withColumn("date", to_date(col("Date"), "yyyy-MM-dd"))
        
        # Calculate daily price change and percentage change
        prices_df = prices_df.withColumn("daily_change", col("Close") - col("Open"))
        prices_df = prices_df.withColumn("daily_change_pct", 
                                       (col("daily_change") / col("Open")) * 100)
        
        # Calculate daily volatility (High - Low)
        prices_df = prices_df.withColumn("daily_volatility", col("High") - col("Low"))
        prices_df = prices_df.withColumn("volatility_pct", 
                                       (col("daily_volatility") / col("Open")) * 100)
        
        # Calculate volume-weighted average price (VWAP)
        prices_df = prices_df.withColumn("vwap", 
                                       (col("High") + col("Low") + col("Close")) / 3)
        
        logger.info("Stock price metrics calculation completed")
        return prices_df
        
    except Exception as e:
        logger.error(f"Failed to process stock price metrics: {e}")
        raise

In [20]:
logger.info("Processing stock price metrics...")
processed_prices = process_stock_price_metrics(prices_df)

2025-07-14 18:40:53,621 — INFO Processing stock price metrics...
2025-07-14 18:40:53,785 — INFO Stock price metrics calculation completed


In [21]:
processed_prices.show(3)

+-----------------+-----------------+----------+-----------------+-----------------+----------------+--------+--------------------+-------------------+-------------------+------------------+------------------+-----------------+
|        Adj Close|            Close|      date|             High|              Low|            Open|  Volume|                 _id|       daily_change|   daily_change_pct|  daily_volatility|    volatility_pct|             vwap|
+-----------------+-----------------+----------+-----------------+-----------------+----------------+--------+--------------------+-------------------+-------------------+------------------+------------------+-----------------+
|92.39199829101562|92.39199829101562|2019-12-31|92.66300201416016|91.61150360107422|92.0999984741211|50130000|{68754f2306c5f159...|0.29199981689453125|0.31704649482331904|1.0514984130859375|1.1416921069563264|   92.22216796875|
|94.90049743652344|94.90049743652344|2020-01-02|94.90049743652344| 93.2074966430664|    

### Stock Price Metrics Processing: Rationale and Justification

To extract meaningful insights from historical stock data, this function computes key financial metrics using **PySpark**, enabling distributed and scalable processing of multiple CSV files across 38 tickers.

#### Why PySpark?

* **Distributed Efficiency**: PySpark efficiently handles large, multi-file datasets, enabling parallel computation of metrics like volatility and price movement without memory bottlenecks.
* **SQL-like API**: PySpark’s DataFrame API simplifies column-wise operations and supports complex transformations in an expressive, readable manner.

#### Calculated Metrics

* **Daily Change** (`Close - Open`) and **% Change** provide insight into daily stock performance.
* **Volatility** (`High - Low`) and **% Volatility** measure market fluctuations.
* **VWAP** (Volume Weighted Average Price approximation using `(High + Low + Close)/3`) offers a benchmark for intraday price assessment.

#### Practical Use

These metrics can support further analysis, such as correlating price movement with tweet sentiment or identifying high-volatility stocks. Processing is aligned with big data principles by avoiding sequential or in-memory operations.