In [None]:
from bs4 import BeautifulSoup
import requests
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import pandas as pd
import random
import os

# Setup

In [None]:
# Only change folder_path
folder_path = r"C:\Users\Nikolai\Downloads\archive"

symbol_path = os.path.join(folder_path, "symbols_valid_meta.csv")
stocks_folder = os.path.join(folder_path, "stocks")
etfs_folder = os.path.join(folder_path, "etfs")

# Load data
symbols = pd.read_csv(symbol_path)

# More info about the columns- https://www.nasdaqtrader.com/trader.aspx?id=symboldirdefs
symbols.head(5)

In [None]:
query = symbols["ETF"] == "Y"

# Get only Exchange-Traded Funds
etfs = symbols.loc[query].reset_index(drop=True).copy()
# Get only Stocks
stocks = symbols.loc[~query].reset_index(drop=True).copy()

In [None]:
# Stocks that we will work with are here
# NOTE: "Common Stock" means that a stock can be bought or sold by investors or traders
my_securities = {
    "MSFT": "Microsoft Corporation - Common Stock",
    "AAPL": "Apple Inc. - Common Stock",
    "NVDA": "NVIDIA Corporation - Common Stock",
    "TSLA": "Tesla, Inc. - Common Stock",
    "AMZN": "Amazon.com, Inc. - Common Stock",
    "NFLX": "Netflix, Inc. - Common Stock"
}

In [None]:
stock_symbols = list(my_securities.keys())
# Randomly pick a symbol
demo = random.choice(stock_symbols)

# NOTE: the files' names are based on the symbol name
file_path = os.path.join(stocks_folder, f"{demo}.csv")
data = pd.read_csv(file_path)
data["symbol"] = demo

# Change data type
data["Date"] = pd.to_datetime(data["Date"])

print(f"Loaded data for {demo}")
# Overall information about the dataset
data.info()

# Statistics

In [None]:
stats = data[["Date", "symbol", "Adj Close"]].copy()
stats.tail()

In [None]:
# Calculate period returns - the percent change (decimals) on a daily basis
# NOTE: to get the PERCENTAGES (%) you need to multiply the column by 100
stats["1day"] = stats["Adj Close"].pct_change().fillna(0)
stats

In [None]:
def calculate_returns(data: pd.DataFrame, calculation_period: str) -> pd.Series:
    """
    Calculate the month to date (mtd) / year to date (ytd) / inception to date (itd)

    Parameters:
        - data - the raw data
        - calculation_period - the period to calculate the data for.
                               The available options are - mtd, qtd, ytd, itd

    Output:
        - Either the mtd, qtd, ytd or itd column depending on the selected
          calculation_period. The numbers are on a daily basis.
    """
    stats = data.copy()
    calculation_period = calculation_period.lower()

    period_map = {
        "mtd": "M",
        "qtd": "Q",
        "ytd": "Y",
        "itd": None
    }

    # Stop the program
    if calculation_period not in period_map.keys(): return

    selected_period = period_map[calculation_period]

    if not selected_period:
        # ITD
        for date in stats["Date"].tolist():
            query = stats["Date"] <= date
            calculation = (stats.loc[query, "1day"] + 1).product() - 1
            
            query = stats["Date"] == date
            stats.loc[query, calculation_period] = calculation
        
        return stats[calculation_period].copy()

    # MTD / YTD
    stats["period"] = stats["Date"].dt.to_period(selected_period)

    for period, group in stats.groupby("period"):
        for date in group["Date"].tolist():
            query = (stats["period"] == period) & (stats["Date"] <= date)
            calculation = (stats.loc[query, "1day"] + 1).product() - 1
            
            query = (stats["period"] == period) & (stats["Date"] == date)
            stats.loc[query, calculation_period] = calculation

    return stats[calculation_period].copy()

In [None]:
stats = stats.assign(
    mtd = calculate_returns(stats, "mtd"), # Month to date
    qtd = calculate_returns(stats, "qtd"), # Quarter to date
    ytd = calculate_returns(stats, "ytd"), # Year to date
    itd = calculate_returns(stats, "itd"), # Inception to date
)

stats.tail(5)

In [None]:
# See the Month to Date only
stats.assign(period = stats["Date"].dt.to_period("M"))\
     .drop_duplicates("period", keep="last")\
     .drop(labels=["1day", "qtd", "ytd", "itd", "period", "Adj Close"], axis=1)

## [Annual Return](https://www.investopedia.com/terms/a/annual-return.asp)

In [None]:
def annual_return(daily_returns: pd.Series) -> float:
    """
    Calculate the annual return

    Parameters:
        - daily_returns - the percent change column on a daily basis (1day)

    Ouput:
        - the annual return
    """
    returns = daily_returns.copy()
    return (((returns + 1).product() - 1) ** (252 / (returns.shape[0] - 1))) - 1

In [None]:
percent = round(annual_return(stats["1day"]) * 100, 2)
print(f"{demo} generates {percent}% annually.")

## [Volatility](https://www.investopedia.com/terms/v/volatility.asp)

In [None]:
def volatility(daily_returns: pd.Series) -> float:
    """
    Calculate the stock volatility

    Parameters:
        - daily_returns - the percent change column on a daily basis (1day)

    Ouput:
        - the volatility
    """
    returns = daily_returns.copy()

    return np.std(returns)

In [None]:
percent = round(volatility(stats["1day"]) * 100, 2)
print(f"{demo}'s volatility is {percent}%.")

# Visualization

In [None]:
viz_data = data.copy()

## [Candlestick](https://www.investopedia.com/terms/c/candlestick.asp)

In [None]:
candlestick = go.Candlestick(x=viz_data["Date"], open=viz_data["Open"], high=viz_data["High"], low=viz_data["Low"], close=viz_data["Close"], name="Candle")
fig = go.Figure(data = [candlestick])

# Update the plot layout
fig.update_layout(title=my_securities[demo], yaxis_title="Price", hovermode = "x unified")
fig.show()

## [Moving Average](https://www.investopedia.com/terms/m/movingaverage.asp)

In [None]:
# 50 day moving average
viz_data["50 MA"] = viz_data["Adj Close"].rolling(50).mean()

# 100 day moving average
viz_data["100 MA"] = viz_data["Adj Close"].rolling(100).mean()

# 200 day moving average
viz_data["200 MA"] = viz_data["Adj Close"].rolling(200).mean()

In [None]:
fig = go.Figure(data = [candlestick])

# Moving average for 50 days
moving_avg_50 = go.Scatter(x = viz_data["Date"], y = viz_data["50 MA"], line_color = "black", name = "Moving Avg. 50", opacity=0.5)
fig.add_trace(moving_avg_50)

# Moving average for 100 days
moving_avg_50 = go.Scatter(x = viz_data["Date"], y = viz_data["100 MA"], line_color = "red", name = "Moving Avg. 100", opacity=0.5)
fig.add_trace(moving_avg_50)

# Moving average for 200 days
moving_avg_50 = go.Scatter(x = viz_data["Date"], y = viz_data["200 MA"], line_color = "green", name = "Moving Avg. 200", opacity=0.5)
fig.add_trace(moving_avg_50)

fig.update_layout(hovermode = "x unified", title_text = f"Moving Average for 50, 100 and 200 period")
fig.show()

## [Double Bollinger Bands](https://www.investopedia.com/terms/b/bollingerbands.asp)

In [None]:
moving_avg = viz_data["Adj Close"].rolling(20).mean()
std = viz_data["Adj Close"].rolling(20).mean()

In [None]:
fig = go.Figure(data = [candlestick])

# Upper Bollinger Band
upper_band = go.Scatter(x = viz_data["Date"], y = (moving_avg + (std * 2)), line_color = "black", name = "Upper Band", opacity=0.5, line = {"dash": "dash"})
fig.add_trace(upper_band)

# Lower Bollinger Band
lower_band = go.Scatter(x = viz_data["Date"], y = (moving_avg - (std * 2)), line_color = "black", name = "Upper Band", opacity=0.5, line = {"dash": "dash"})
fig.add_trace(lower_band)

fig.update_layout(hovermode = "x unified", title_text = f"Double Bollinger Bands")
fig.show()

## [S&P 500 Components](https://en.wikipedia.org/wiki/List_of_S%26P_500_companies)

In [None]:
# Fetch the S&P 500 Components from website
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"

html = BeautifulSoup(requests.get(url).content)

In [None]:
# Scrape table into HTML
columns = []
table_data = []

for table_row in html.find(id="constituents").find_all("tr"):

    # First row is always the header
    if not columns:
        columns = [th.text.strip().lower().replace(" ", "-").replace("-", "_") for th in table_row.find_all("th")]
        continue
    
    # Second row onwards has the data
    raw_data = [td.text.strip() for td in table_row.find_all("td")]

    # dictionary format - column: value
    table_data.append(dict(zip(columns, raw_data)))

data = pd.DataFrame(table_data)

data.head()

In [None]:
# Fix columns
data = data.assign(
    # Keep the first date from left to right
    founded = data["founded"].str.split(" ").str[0].str.split("/").str[0].astype(int),
    date_added = pd.to_datetime(data["date_added"])
)

In [None]:
sectors = data.assign(count = 1).groupby("gics_sector").sum(numeric_only=True).drop("founded", axis=1).reset_index().copy()

fig = px.pie(sectors, values="count", names="gics_sector", title="S&P 500 Sectors")
fig.show()

In [None]:
# Get company weights

# Change this path
html_path = r"C:\Users\Nikolai\Downloads\archive\S&P 500 Companies by Weight.html"
html = BeautifulSoup(open(html_path).read(), "html.parser")


In [None]:
columns = []
table_data = []

for table_row in html.find("table", class_="table table-hover table-borderless table-sm").find_all("tr"):

    if not columns:
        columns = [th.text.strip() for th in table_row.find_all("th")]
        continue
    
    # Second row onwards has the data
    raw_data = [td.text.strip() for td in table_row.find_all("td")]

    # dictionary format - column: value
    table_data.append(dict(zip(columns, raw_data)))

name_mappings = {
    "Symbol": "symbol",
    "Portfolio%": "weight"
}
weights = pd.DataFrame(table_data)[list(name_mappings.keys())].rename(columns=name_mappings)

weights.head()

In [None]:
# Add the weights to the Wikipedia data
data = data.merge(weights, how="left", on="symbol")
# Convert weights to decimal floats
data["weight"] = data["weight"].str.replace("%", "").astype(float) / 100

In [None]:
sectors = data.groupby("gics_sector").sum(numeric_only=True).drop("founded", axis=1).reset_index().copy()

fig = px.pie(sectors, values="weight", names="gics_sector", title="S&P 500 Components weight")
fig.show()

In [None]:
# Get the countries
data["country"] = data["headquarters_location"].str.split(", ").str[1]

country_code_url = "https://raw.githubusercontent.com/plotly/datasets/master/2014_world_gdp_with_codes.csv"
country_codes = pd.read_csv(country_code_url)

# Make columns lowercase
country_codes.columns = [column.lower() for column in country_codes.columns]
data = data.merge(country_codes, how="left", on="country").drop("gdp (billions)", axis=1)

# NOTE: UK and US states are missing countries
data.head()

In [None]:
# NOTE: Missing codes are based on the country_codes variable

# Fills missing codes with USA
data["code"] = data["code"].fillna("USA")

# Fix the UK
data.loc[data["country"] == "UK", "code"] = "GBR"

In [None]:
world_map_data = data.groupby("code").sum(numeric_only=True).drop("founded", axis=1).sort_values("weight").reset_index().copy()

# Visualize world map
world_map = go.Choropleth(
        locations = world_map_data['code'],
        z = (world_map_data['weight'] * 100).round(2),
        colorscale = 'Reds',
        marker_line_width=0.5,
        colorbar_title = 'Weights',
    )

fig = go.Figure(data=world_map)
fig.update_layout(
    title_text='S&P 500 Country Weights',
    geo=dict(showframe=False, showcoastlines=False,),
    annotations = [dict(showarrow = False, text='')]
)
  
fig.show()

In [None]:
# Same numbers as the map
world_map_data.assign(weight= (world_map_data["weight"] * 100).round(2))\
            .sort_values("weight", ascending=False)\
            .reset_index(drop=True)