# Importing environment variables

In [1]:
import os
os.environ["PATH"] += os.pathsep + "D:/hadoop/hadoop-3.4.1/bin"
os.environ["JAVA_HOME"] = "D:/jdk-hotspot"

In [2]:
import getpass
from dotenv import load_dotenv

load_dotenv()

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("MONGO_URI")
_set_env("BEARER_TOKEN")
_set_env("REDDIT_CLIENT_ID")
_set_env("REDDIT_CLIENT_SECRET")

# Importing stocks data

In [3]:
import yfinance as yf
import pandas as pd
from pymongo import MongoClient
from datetime import datetime

# Stock symbols to fetch
symbols = ["AAPL", "GOOGL", "MSFT", "TSLA"]

# MongoDB setup (NoSQL)
client = MongoClient(os.environ["MONGO_URI"])
db = client["stock_market"]
collection = db["stock_prices"]

all_data = []  # Accumulator

# Fetch and store
for symbol in symbols:
    ticker = yf.Ticker(symbol)
    data = ticker.history(period="1y")  # past 1 year data
    data.reset_index(inplace=True)
    data["symbol"] = symbol

    # Convert datetime to ISO format for Mongo
    data["Date"] = data["Date"].apply(lambda x: x.to_pydatetime().isoformat())

    # Store in Mongo
    records = data.to_dict(orient="records")
    collection.insert_many(records)

    # Accumulate for CSV
    all_data.append(data)

# Concatenate all dataframes and export
combined_df = pd.concat(all_data)
combined_df.to_csv("stock_data.csv", index=False)

# HDFS Upload
!hdfs dfs -mkdir -p /stock_market_data/
!hdfs dfs -put -f stock_data.csv /stock_market_data/

print("✅ All stock data fetched, combined, stored in MongoDB, and saved to CSV.")


✅ All stock data fetched, combined, stored in MongoDB, and saved to CSV.


# Importing tweets for sentiment analysis

In [4]:
import tweepy
from textblob import TextBlob
from pymongo import MongoClient
from datetime import datetime

# Initialize Tweepy v2 Client
client = tweepy.Client(bearer_token=os.environ["BEARER_TOKEN"])

# MongoDB setup
mongo_client = MongoClient(os.environ["MONGO_URI"])
db = mongo_client["stock_market"]
sentiment_collection = db["twitter_sentiment"]

# Define query and fetch tweets
query = "AAPL OR TSLA OR stock market -is:retweet lang:en"
tweets = client.search_recent_tweets(query=query, max_results=50, tweet_fields=["created_at", "text"])

# Process and store tweets
for tweet in tweets.data:
    sentiment = TextBlob(tweet.text).sentiment.polarity
    doc = {
        "text": tweet.text,
        "timestamp": tweet.created_at.isoformat(),
        "polarity": sentiment
    }
    sentiment_collection.insert_one(doc)

print("✅ Tweets fetched and stored with sentiment.")



✅ Tweets fetched and stored with sentiment.


# Importing Reddit posts for sentiment analysis

In [5]:
import praw
from pymongo import MongoClient
from textblob import TextBlob
from datetime import datetime

mongo_client = MongoClient(os.environ["MONGO_URI"])
db = mongo_client["stock_market"]

reddit = praw.Reddit(client_id=os.environ["REDDIT_CLIENT_ID"],
                     client_secret=os.environ["REDDIT_CLIENT_SECRET"],
                     user_agent='Stocks')

posts = reddit.subreddit('stocks').hot(limit=50)

for post in posts:
    sentiment = TextBlob(post.title + " " + post.selftext).sentiment.polarity
    doc = {
        "title": post.title,
        "content": post.selftext,
        "timestamp": datetime.utcfromtimestamp(post.created_utc).isoformat(),
        "sentiment": sentiment
    }
    db["reddit_sentiment"].insert_one(doc)

print("✅ Reddit sentiment stored.")


  "timestamp": datetime.utcfromtimestamp(post.created_utc).isoformat(),


✅ Reddit sentiment stored.


# Extracting meaningful features using PCA

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Stock Feature Engineering") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

# Load stock data
df = spark.read.csv("stock_data.csv", header=True, inferSchema=True)

# Convert Date to timestamp and sort
df = df.withColumn("Date", col("Date").cast("timestamp")) \
       .orderBy("symbol", "Date")

# Create rolling average and daily returns
windowSpec = Window.partitionBy("symbol").orderBy("Date")

df = df.withColumn("Prev_Close", lag("Close").over(windowSpec))
df = df.withColumn("Daily_Return", (col("Close") - col("Prev_Close")) / col("Prev_Close"))
df = df.withColumn("MA_5", avg("Close").over(windowSpec.rowsBetween(-4, 0)))
df = df.dropna()

# Feature Vector
feature_cols = ["Open", "High", "Low", "Close", "Volume", "Daily_Return", "MA_5"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features_scaled")

# PCA to reduce to 3 principal components
pca = PCA(k=3, inputCol="features_scaled", outputCol="features")

pipeline = Pipeline(stages=[assembler, scaler, pca])
model = pipeline.fit(df)
transformed = model.transform(df)

transformed.select("Date", "symbol", "features").show(5, truncate=False)


+-------------------+------+------------------------------------------------------------+
|Date               |symbol|features                                                    |
+-------------------+------+------------------------------------------------------------+
|2024-04-15 09:30:00|AAPL  |[-3.6034592590327206,-0.8590518681222201,2.1856737306147562]|
|2024-04-16 09:30:00|AAPL  |[-3.5336865656799894,-0.9298950707215315,2.1237477016764656]|
|2024-04-17 09:30:00|AAPL  |[-3.553163962284043,-0.8588893233473567,1.4317278472627286] |
|2024-04-18 09:30:00|AAPL  |[-3.5421520010728385,-0.7975403328313077,1.2244521861873687]|
|2024-04-19 09:30:00|AAPL  |[-3.434311064622725,-1.0143600966961897,1.8434163503131928] |
+-------------------+------+------------------------------------------------------------+
only showing top 5 rows



# Applying different ML and statistical models on data and calculating RMSE and R2 score for each

In [3]:
from pyspark.ml.regression import IsotonicRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lead

df = transformed.withColumn("Next_Close", lead("Close", 1).over(windowSpec))
df = df.dropna()

# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

from pyspark.ml.feature import VectorSlicer

slicer = VectorSlicer(inputCol="features", outputCol="feature1", indices=[0])  # use the first feature
sliced_train = slicer.transform(train_data)
sliced_test = slicer.transform(test_data)

iso = IsotonicRegression(featuresCol="feature1", labelCol="Next_Close")
iso_model = iso.fit(sliced_train)
iso_predictions = iso_model.transform(sliced_test)

# Evaluate
evaluator = RegressionEvaluator(labelCol="Next_Close", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(
    labelCol="Next_Close",
    predictionCol="prediction",
    metricName="r2"
)
r2 = r2_evaluator.evaluate(iso_predictions)
rmse = evaluator.evaluate(iso_predictions)
print(f"✅ RMSE using Isotonic Regression: {rmse}")
print(f"✅ R2 Score using Isotonic Regression: {r2}")

✅ RMSE using Isotonic Regression: 105.10832242725765
✅ R2 Score using Isotonic Regression: -0.0004212473517037907


In [4]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lead

# Add target column (next day close)
df = transformed.withColumn("Next_Close", lead("Close", 1).over(windowSpec))
df = df.dropna()

# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Choose a regression model (can swap later for comparison)
regressor = DecisionTreeRegressor(featuresCol="features", labelCol="Next_Close")

# Train
model = regressor.fit(train_data)

# Predict
predictions = model.transform(test_data)

# Evaluate
evaluator = RegressionEvaluator(labelCol="Next_Close", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(
    labelCol="Next_Close",
    predictionCol="prediction",
    metricName="r2"
)
rmse = evaluator.evaluate(predictions)
r2 = r2_evaluator.evaluate(predictions)
print(f"✅ RMSE using Decision Tree Regression: {rmse}")
print(f"✅ R2 Score using Decision Tree Regression: {r2}")

✅ RMSE using Decision Tree Regression: 10.469868366388567
✅ R2 Score using Decision Tree Regression: 0.990073614426227


In [5]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lead

# Add target column (next day close)
df = transformed.withColumn("Next_Close", lead("Close", 1).over(windowSpec))
df = df.dropna()

# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Choose a regression model (can swap later for comparison)
regressor = GBTRegressor(featuresCol="features", labelCol="Next_Close")

# Train
model = regressor.fit(train_data)

# Predict
predictions = model.transform(test_data)

# Evaluate
evaluator = RegressionEvaluator(labelCol="Next_Close", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(
    labelCol="Next_Close",
    predictionCol="prediction",
    metricName="r2"
)
r2 = r2_evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions)
print(f"✅ RMSE using GBT Regression: {rmse}")
print(f"✅ R2 Score using GBT Regression: {r2}")

✅ RMSE using GBT Regression: 9.367053276512667
✅ R2 Score using GBT Regression: 0.9920546198994032


In [6]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lead

# Add target column (next day close)
df = transformed.withColumn("Next_Close", lead("Close", 1).over(windowSpec))
df = df.dropna()

# Train-test split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Choose a regression model (can swap later for comparison)
regressor = LinearRegression(featuresCol="features", labelCol="Next_Close")

# Train
model = regressor.fit(train_data)

# Predict
predictions = model.transform(test_data)

# Evaluate
evaluator = RegressionEvaluator(labelCol="Next_Close", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(
    labelCol="Next_Close",
    predictionCol="prediction",
    metricName="r2"
)
r2 = r2_evaluator.evaluate(predictions)
rmse = evaluator.evaluate(predictions)
print(f"✅ RMSE using Linear Regression: {rmse}")
print(f"✅ R2 Score using Linear Regression: {r2}")

✅ RMSE using Linear Regression: 5.839494523771556
✅ R2 Score using Linear Regression: 0.996912128574625


# Using Linear Regression predictions for visualization and storing

In [8]:
# Make predictions

predictions = model.transform(test_data)

# Evaluate
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="Next_Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 5.839494523771556


In [9]:
# Select relevant columns (adjust column names as per your schema)
output_df = predictions.select("Date", "symbol", "Next_Close", "prediction")

# Convert to Pandas DataFrame
pandas_df = output_df.toPandas()

# Save to CSV
pandas_df.to_csv("predicted_stock_prices.csv", index=False)


# Creating Hive table for pattern analysis and performing query

In [10]:
df = spark.read.csv("stock_data.csv", header=True, inferSchema=True)

from pyspark.sql.functions import col
df_cleaned = df.select(
    col("symbol"),
    col("Date").alias("date"),
    col("Close").alias("close"),
    col("Volume").alias("volume")
)

df_cleaned.createOrReplaceTempView("stock_data")

#Create a Hive table and insert data into it
spark.sql("""
    CREATE TABLE IF NOT EXISTS stock_data_hive (
        symbol STRING,
        date TIMESTAMP,
        close DOUBLE,
        volume DOUBLE
    )
""")

# Insert data into Hive table
df_cleaned.write.mode("overwrite").insertInto("stock_data_hive")

In [11]:
query = """
SELECT symbol, AVG(close) as avg_close
FROM stock_data_hive
GROUP BY symbol
ORDER BY avg_close DESC
"""
result = spark.sql(query)
result.show()


+------+------------------+
|symbol|         avg_close|
+------+------------------+
|  MSFT|417.72814029526427|
|  TSLA|  266.694980028616|
|  AAPL|219.79621139465576|
| GOOGL|172.51824476994366|
+------+------------------+

