In [None]:
#Step1
#Get stock data from Nasdaq API(two ways) 
#I uploaded the zipped file with three csv files with the data we use in this code.
#WIKIPRICE.csv 1.8GB is the stock price data. ticker.csv is the ticker symbol for the companies. company_info_enriched.csv contains the companies' information.
#ticker.csv is downloaded from Nasdaq website containing all tickers of our stock data. Link: https://static.quandl.com/coverage/WIKI_PRICES.csv
#So you do not have to do this.

In [None]:
#First way we can download load zip of csv and unzip it on your laptop
#This way is much faster

In [None]:
import nasdaqdatalink
nasdaqdatalink.ApiConfig.api_key = "iLo48Zo6WHLSxmhjqGf7"

In [None]:
#WIKIPRICE.zip is the file name

In [None]:
nasdaqdatalink.export_table('WIKI/PRICES', filename='../Downloads/WIKIPRICE.zip')

In [None]:
#Second Way(Optional) (This takes about one hour to run)
#This read each year data from the API and get them into csv.

In [None]:
import nasdaqdatalink
import pandas as pd


nasdaqdatalink.ApiConfig.api_key = 'VfubDDwJnctmbAGG6A9x'

all_data = []  

for year in range(1962, 2018):  
    print(f"Fetching data for {year}...")
    try:
        df = nasdaqdatalink.get_table(
            'WIKI/PRICES',
            date={'gte': f'{year}-01-01', 'lte': f'{year}-12-31'},
            paginate=True
        )
        all_data.append(df)
        print(f"{year} complete: {len(df)} rows")
    except Exception as e:
        print(f"Error fetching {year}: {e}")


print("Fetching data for 2018...")
df_2018 = nasdaqdatalink.get_table(
    'WIKI/PRICES',
    date={'gte': '2018-01-01', 'lte': '2018-04-11'},
    paginate=True
)
all_data.append(df_2018)
print("2018 complete:", len(df_2018), "rows")


combined_df = pd.concat(all_data, ignore_index=True)


combined_df.to_csv("wiki_prices.csv", index=False)

print("All done! Final row count:", len(combined_df))


In [None]:
#Step2
#Download the company inforamtion data from yfinance

In [None]:
#!pip install yfinance

In [None]:
#First we use the dataset containing the ticker name to get the ticker list
#ticker.csv is downloaded from Nasdaq website containing all tickers of our stock data. Link: https://static.quandl.com/coverage/WIKI_PRICES.csv
#And check each ticker information in yfinance
#We create 

In [None]:
import pandas as pd
import yfinance as yf

# Load your ticker list from CSV
df = pd.read_csv("ticker.csv")
tickers = df['ticker'].unique().tolist()

In [None]:
#This code also return the missing ticker in yfinance, and we can add the information for them in the future

In [None]:
company_info = []

for symbol in tickers:
    try:
        ticker_obj = yf.Ticker(symbol)
        info = ticker_obj.info
        company_info.append({
            "ticker": symbol,
            "longName": info.get("longName"),
            "shortName":info.get("shortName"),
            "sector": info.get("sector"),
            "industry": info.get("industry"),
            "country": info.get("country"),
            "city":info.get("city"),
            "address":info.get("address"),
            "phone":info.get("phone"),
            "website":info.get("website"),
            "exchange": info.get("exchange"),
            "marketCap": info.get("marketCap"),
            "fullTimeEmployees":info.get("fullTimeEmployees"),
            "longBusinessSummary":info.get("longBusinessSummary")
        })
    except Exception as e:
        company_info.append({
            "ticker": symbol,
            "error": str(e)
        })

# Convert to DataFrame and save
result_df = pd.DataFrame(company_info)
result_df.to_csv("company_info_enriched.csv", index=False)
print("Company metadata saved to 'company_info_enriched.csv'")

In [None]:
#Get Stock Data into pgAdmin4(this may take two hours)
#Connect the pgAdmin4

In [26]:
import psycopg, os

print('Connecting to the PostgreSQL database...')
conn = psycopg.connect(
    host="localhost",
    port='5432',
    dbname="Final Project",
    user="postgres",
    password="123")
cur = conn.cursor()

Connecting to the PostgreSQL database...


In [6]:
#!pip install pandas sqlalchemy psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp312-cp312-macosx_14_0_arm64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp312-cp312-macosx_14_0_arm64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m205.3 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
#Step3
#Read the stock data store in csv
#You need to change the csv name 

In [22]:
import pandas as pd

# Load CSV
df = pd.read_csv("../Downloads/WIKEPRICE.csv")
df['date'] = pd.to_datetime(df['date'])  # Ensure 'date' is parsed properly



✅ Data loaded successfully!


In [None]:
#Build SQL structure

In [20]:
createCmd = """ CREATE TABLE stock_prices (
    ticker VARCHAR(5),
    date DATE,
    open NUMERIC,
    high NUMERIC,
    low NUMERIC,
    close NUMERIC,
    volume BIGSERIAL,
    ex_dividend NUMERIC,
    split_ratio NUMERIC,
    adj_open NUMERIC,
    adj_high NUMERIC,
    adj_low NUMERIC,
    adj_close NUMERIC,
    adj_volume BIGSERIAL,
    PRIMARY KEY (ticker, date)
)
            """
    
cur.execute(createCmd)
conn.commit()

In [None]:
#Get the data from df into pgAdmin4

In [28]:
from tqdm import tqdm

insert_query = """
    INSERT INTO stock_prices (
        ticker, date, open, high, low, close, volume,
        ex_dividend, split_ratio, adj_open, adj_high,
        adj_low, adj_close, adj_volume
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

for row in tqdm(df.itertuples(index=False), total=len(df)):
    cur.execute(insert_query, tuple(row))
conn.commit()

100%|█████████████████████████| 15389314/15389314 [1:06:00<00:00, 3885.27it/s]


In [None]:
#Step4
#Get Company Information into MongoDB

In [None]:
import pandas as pd
from pymongo import MongoClient

# Step 1: Load your enriched CSV
df = pd.read_csv("../Downloads/company_info_enriched.csv")

# Step 2: Convert DataFrame to dictionary
records = df.to_dict(orient='records')

# Step 3: Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["stock_data"]  # You can name this anything
collection = db["company_info"]  # Target collection

# Step 4: Insert data into MongoDB
collection.insert_many(records)

print("Company metadata successfully inserted into MongoDB.")

In [None]:
#Step5
#Use flask to build the website and insert spark into it

In [None]:
from flask import Flask, request, render_template, render_template_string
import psycopg
from pymongo import MongoClient


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, log, when
from pyspark.sql.window import Window
import pandas as pd
# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("Intro to Apache Spark") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max', '512m') \
    .config("spark.driver.cores", "4") \
    .getOrCreate()

sc = spark.sparkContext

print("Using Apache Spark Version", spark.version)


In [None]:
# Connect to PostgreSQL (pgAdmin)
pg_conn = psycopg.connect(
    host="localhost",
    dbname="Final Project",
    user="postgres",
    password="123",
    port="5432"
)
pg_cursor = pg_conn.cursor()

# Connect to MongoDB
mongo_client = MongoClient("mongodb://localhost:27017/")
mongo_db = mongo_client["stock_data"]
mongo_collection = mongo_db["company_info"]


In [None]:
app = Flask(__name__)

In [None]:
form_html = """
<!DOCTYPE html>
<html>
<head>
    <title>Stock Search</title>
    <style>
        .checkbox-group {
            display: flex;
            flex-wrap: wrap;
            gap: 15px;
        }
    </style>
</head>
<body>
    <h2>Search Stock Info</h2>
    <form method="POST" action="/search">
        <label>Ticker Symbol:</label>
        <input type="text" name="ticker" required><br><br>

        <label>Start Date:</label>
        <input type="date" name="start_date"><br><br>

        <label>End Date:</label>
        <input type="date" name="end_date"><br><br>

        <label>Select Columns (ticker & date are always included):</label><br>
        <div class="checkbox-group">
            <label><input type="checkbox" name="columns" value="open"> open</label>
            <label><input type="checkbox" name="columns" value="high"> high</label>
            <label><input type="checkbox" name="columns" value="low"> low</label>
            <label><input type="checkbox" name="columns" value="close"> close</label>
            <label><input type="checkbox" name="columns" value="volume"> volume</label>
            <label><input type="checkbox" name="columns" value="ex_dividend"> ex_dividend</label>
            <label><input type="checkbox" name="columns" value="split_ratio"> split_ratio</label>
            <label><input type="checkbox" name="columns" value="adj_open"> adj_open</label>
            <label><input type="checkbox" name="columns" value="adj_high"> adj_high</label>
            <label><input type="checkbox" name="columns" value="adj_low"> adj_low</label>
            <label><input type="checkbox" name="columns" value="adj_close"> adj_close</label>
            <label><input type="checkbox" name="columns" value="adj_volume"> adj_volume</label>
        </div><br>

        <label>Select Derived Values to Calculate (select used value to calculate first):</label><br>
        <div class="checkbox-group">
            <label><input type="checkbox" name="calcs" value="daily_return"> Daily Return</label>
            <label><input type="checkbox" name="calcs" value="adj_daily_return"> Adjusted Daily Return</label>
            <label><input type="checkbox" name="calcs" value="high_low_range"> High-Low Range</label>
            <label><input type="checkbox" name="calcs" value="log_return"> Log Return</label>
            <label><input type="checkbox" name="calcs" value="price_change"> Price Change</label>
        </div><br>

        <input type="submit" value="Search">
    </form>
</body>
</html>
"""

In [None]:
@app.route("/", methods=["GET"])
def home():
    return render_template_string(form_html)


In [None]:
from flask import request
import pandas as pd
import numbers

@app.route("/search", methods=["POST"])
def search():
    pg_conn.rollback()  # reset any failed transactions

    ticker = request.form.get("ticker", "").upper()
    start_date = request.form.get("start_date")
    end_date = request.form.get("end_date")
    selected_columns = request.form.getlist("columns")
    selected_calcs = request.form.getlist("calcs")

    # Add required columns for selected calculations
    required_map = {
        "daily_return": "close",
        "adj_daily_return": "adj_close",
        "high_low_range": ["high", "low"],
        "log_return": "close",
        "price_change": "close"
    }
    for calc in selected_calcs:
        req = required_map.get(calc)
        if isinstance(req, list):
            for r in req:
                if r not in selected_columns:
                    selected_columns.append(r)
        elif req and req not in selected_columns:
            selected_columns.append(req)

    def quote(col):
        return f'"{col}"' if "-" in col else col

    all_columns = ["ticker", "date"] + selected_columns
    column_sql = ", ".join(quote(c) for c in all_columns)

    query = f"SELECT {column_sql} FROM stock_prices WHERE ticker = %s"
    params = [ticker]
    if start_date:
        query += " AND date >= %s"
        params.append(start_date)
    if end_date:
        query += " AND date <= %s"
        params.append(end_date)
    query += " ORDER BY date DESC"

    try:
        pg_cursor.execute(query, tuple(params))
        stock_data = pg_cursor.fetchall()
    except Exception as e:
        return f"<h3>SQL Error:</h3><pre>{e}</pre><br><a href='/'>Back</a>"

    company_info = mongo_collection.find_one({"ticker": ticker})
    html = f"<h2>Results for {ticker}</h2>"

    # --- Company Info ---
    if company_info:
        html += "<h3>Company Info</h3><ul>"
        for key, value in company_info.items():
            if key != "_id":
                if isinstance(value, str) and value.startswith("http"):
                    html += f"<li><strong>{key}:</strong> <a href='{value}' target='_blank'>{value}</a></li>"
                else:
                    html += f"<li><strong>{key}:</strong> {value}</li>"
        html += "</ul>"
    else:
        html += "<p>No company info found in MongoDB.</p>"

    used_spark = False

    if stock_data:
        pdf = pd.DataFrame(stock_data, columns=all_columns)

        if selected_calcs:
            from pyspark.sql.window import Window
            from pyspark.sql.functions import col, lag, log

            sdf = spark.createDataFrame(pdf)
            window_spec = Window.partitionBy("ticker").orderBy("date")

            if "daily_return" in selected_calcs and "close" in pdf.columns:
                sdf = sdf.withColumn("daily_return",
                    (col("close") - lag("close", 1).over(window_spec)) / lag("close", 1).over(window_spec))
            if "adj_daily_return" in selected_calcs and "adj_close" in pdf.columns:
                sdf = sdf.withColumn("adj_daily_return",
                    (col("adj_close") - lag("adj_close", 1).over(window_spec)) / lag("adj_close", 1).over(window_spec))
            if "high_low_range" in selected_calcs and "high" in pdf.columns and "low" in pdf.columns:
                sdf = sdf.withColumn("high_low_range", col("high") - col("low"))
            if "log_return" in selected_calcs and "close" in pdf.columns:
                sdf = sdf.withColumn("log_return",
                    log(col("close") / lag("close", 1).over(window_spec)))
            if "price_change" in selected_calcs and "close" in pdf.columns:
                sdf = sdf.withColumn("price_change", col("close") - lag("close", 1).over(window_spec))

            pdf_final = sdf.toPandas()
            all_columns = list(pdf_final.columns)
            stock_data = pdf_final.values.tolist()
            used_spark = True

    # --- HTML Table ---
    if stock_data:
        html += "<h3>Stock Price Data</h3><table border='1'><tr>"
        for column in all_columns:
            html += f"<th>{column}</th>"
        html += "</tr>"

        for row in stock_data:
            html += "<tr>"
            for val in row:
                try:
                    if isinstance(val, numbers.Real):
                        formatted_val = f"{val:.4f}" if used_spark else f"{val:.2f}"
                    else:
                        formatted_val = str(val)
                except Exception:
                    formatted_val = str(val)
                html += f"<td>{formatted_val}</td>"
            html += "</tr>"
        html += "</table>"
    else:
        html += "<p>No stock price data found.</p>"

    html += "<br><a href='/'>Back to Search</a>"
    return html


In [None]:
app.run(port=5000)