<a href="https://colab.research.google.com/github/Jacob-Rose-BU/Alternative-Investments---Assette-Capstone-Project/blob/main/yfinance_Source.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


** Please note after conversations with the Business Advisor on Friday (prior to checkpoint submission), any sections talking about Holdings in this file is no longer valid. Updates will not offically been made to documentation until confirmation with the Business Advisor at the next meeting. Holdings will be created in the Snowflake database. Review Holdings File for SQL code (Fund Creation & Holding Allocation Pipeline).

#**ESG Equity Fact Sheet - YFinance Data Pipeline**

This notebook pulls real financial data using the Yahoo Finance API via the yfinance library. It generates tables for Security Master, ESG data, and historical price performance for U.S. equities.The extracted data is prepared to be loaded into Snowflake for downstream use in ESG fund fact sheets.


###**Execution Instructions**

**To run this notebook:**
1. Update your Snowflake credentials in the environment or connection file.
2. Run the notebook sequentially from top to bottom.

### **File Roadmap**
Pull S&P 500, NASDAQ 100, Dow Jones tickers <br>
Pull yfinance data for valid tickers <br>
Extract and clean ESG and performance history <br>
Push to Snowflake

**Output:** 3 tables in snowflake (security_master, esg_stock_data, stock_performance_history)


### **Next Steps**
#### **yfinance**
- Improve ESG completeness check (what to do when ESG data is not given in yfinance - maybe pull in ESG API)
- Add performance benchmark (SPESG & SUSL)

####**Snowflake SQl Documentation**
- Write code for Holdings creation in Snowflake (Friday Conversation w/ Corey)
- Create documentation for reusable steps for top 10 holdings by weight
- Create documentation for reusable steps for fund level ESG score aggregation
- Create documentation for reusable steps for fund performance versus benchmark

### **Future Improvement:**
- Automate periodic data refresh
- Add additional tickers
- Backfull daily performance

# **Connect to Snowflake**


To load data into Snowflake, we established a secure connection using credentials stored in a .env file. This connection allows us to push data directly from Python. The pipeline is designed to check if tables already exist, create them if needed, and merge new data while avoiding duplicates. This setup enables seamless integration between our local data processing and Snowflake's cloud warehouse, supporting scalable, centralized storage for downstream analytics like ESG reporting and fact sheet generation.

In [1]:
#load the .env file
from google.colab import files
files.upload()

Saving .env.txt to .env.txt


{'.env.txt': b'SNOWFLAKE_ACCOUNT=assette-ssappoc\nSNOWFLAKE_USER=CRYSTALL\nSNOWFLAKE_PASSWORD=Bbnmghjtyu123!\nSNOWFLAKE_ROLE=AST_ALTERNATIVES_DB_RW\nSNOWFLAKE_WAREHOUSE=AST_BU_WH\nSNOWFLAKE_DATABASE=AST_ALTERNATIVES_DB\nSNOWFLAKE_SCHEMA=DBO'}

In [2]:
#rename the file if needed
import os

if os.path.exists(".env.txt"):
    os.rename(".env.txt", ".env")
    print("Renamed .env.txt to .env")
else:
    print("File not found. Make sure you uploaded .env.txt.")

Renamed .env.txt to .env


In [3]:
!pip install snowflake-connector-python python-dotenv

Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.16.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (71 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/71.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.8/71.8 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading boto3-1.39.15-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore>=1.24 (from snowflake-connector-python)
  Downloading botocore-1.39.15-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3>=1.24->snowflake-connector-python)
  Downloading jmespath-1.0.1-py3-none-a

In [4]:
import os
from dotenv import load_dotenv
import snowflake.connector

# Load .env file data
load_dotenv(".env")


True

In [5]:
#use .env paramaters to connect to snowflake
def get_snowflake_connection():
    return snowflake.connector.connect(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT"),
        role=os.getenv("SNOWFLAKE_ROLE"),
        warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
        database=os.getenv("SNOWFLAKE_DATABASE"),
        schema=os.getenv("SNOWFLAKE_SCHEMA")
    )
#connection - connection is authenticated
connection = get_snowflake_connection()
#lets me run SQL commands
cursor = connection.cursor()


In [6]:
from snowflake.connector.pandas_tools import write_pandas

def safe_quote(col: str) -> str:
    """
    Ensures column names are safely quoted for Snowflake SQL syntax.
    Replaces internal quotes and wraps the name in double quotes.
    """
    col = str(col).replace('"', '""').strip()
    return f'"{col}"'

def map_dtype_to_snowflake(dtype):
    """
    Maps pandas dtypes to Snowflake SQL data types.
    """
    if pd.api.types.is_float_dtype(dtype):
        return "FLOAT"
    elif pd.api.types.is_integer_dtype(dtype):
        return "NUMBER"
    else:
        return "VARCHAR"

def load_to_snowflake_merge(df, table_name, conn, unique_keys):
    """
    Uploads DataFrame to Snowflake with type inference and merge logic.
    """
    cur = conn.cursor()
    df_cols = df.columns.tolist()
    temp_table = f"{table_name}_STAGING"

    # Step 1: Infer column types and create table if needed
    col_defs = ", ".join([
        f"{safe_quote(col)} {map_dtype_to_snowflake(df[col].dtype)}"
        for col in df_cols
    ])
    cur.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({col_defs})")

    # Step 2: Add any missing columns to the main table
    cur.execute(f"DESC TABLE {table_name}")
    existing_cols = {row[0].upper() for row in cur.fetchall()}
    for col in df_cols:
        if col.upper() not in existing_cols:
            col_type = map_dtype_to_snowflake(df[col].dtype)
            cur.execute(f"ALTER TABLE {table_name} ADD COLUMN {safe_quote(col)} {col_type}")

    # Step 3: Create staging table
    cur.execute(f"CREATE OR REPLACE TABLE {temp_table} ({col_defs})")
    write_pandas(conn, df, temp_table)

    # Step 4: Merge without duplication
    on_clause = " AND ".join([f"t.{safe_quote(col)} = s.{safe_quote(col)}" for col in unique_keys])
    insert_cols = ", ".join([safe_quote(col) for col in df_cols])
    insert_vals = ", ".join([f"s.{safe_quote(col)}" for col in df_cols])

    merge_stmt = f"""
        MERGE INTO {table_name} t
        USING {temp_table} s
        ON {on_clause}
        WHEN NOT MATCHED THEN
            INSERT ({insert_cols}) VALUES ({insert_vals})
    """
    cur.execute(merge_stmt)

    # Step 5: Clean up
    cur.execute(f"DROP TABLE IF EXISTS {temp_table}")
    cur.close()
    conn.close()

    print(f"{table_name} updated. Duplicates prevented using keys: {unique_keys}")


In [7]:
#test the connection
cursor.execute("SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_DATABASE(), CURRENT_DATE;")

for row in cursor:
    print(row)

('CRYSTALL', 'AST_ALTERNATIVES_DB_RW', 'AST_ALTERNATIVES_DB', datetime.date(2025, 7, 29))


In [8]:
#close SQL cursor
cursor.close()
#close connection to snowflake
connection.close()

# **Securities List**

This code builds a clean and verified list of stocks by scraping 3 major US equity indices from Wikipedia: the S&P500, Dow Jones Industrial Average, and NASDAQ 100. Each of these index lists are retried through BeautifulSoup. The extracted tickers are combined into a single list, cleaned, conform to the expected format, and deduplicated between the 3 indexes. To ensure only valid tickers are included a function was defined to check that Yahoo Finance returns metadata.
<br> <br>
The tickers chosen (S&P500, Dow Jones Industrial Average, NASDAQ 100) represent large, liquid, and well known US companies. They are likely to be included in popular retail and institutional funds, making them a reasonable starting point for building fund simulations and a security master. The decision to limit the scope to these indives was intentional, by focusing on high confdence symbols, the code minimizes errors and avoids excessive querying that could trigger rate limits or bands fron the Yahoo Finance API.

In [None]:
import yfinance as yf
import pandas as pd
import time
from bs4 import BeautifulSoup
import requests

#get tickers from sp500
def get_sp500_tickers():
    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    soup = BeautifulSoup(requests.get(url).text, "lxml")
    table = soup.find("table", {"id": "constituents"})
    return [row.find_all("td")[0].text.strip() for row in table.find_all("tr")[1:]]

#get tickers from dow jones indstrial
def get_dow_tickers():
    url = "https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average"
    soup = BeautifulSoup(requests.get(url).text, "lxml")
    table = soup.find("table", {"id": "constituents"})
    return [
        row.find_all("td")[1].find("a").text.strip()
        for row in table.find_all("tr")[1:]
        if len(row.find_all("td")) >= 2
    ]

#get tickers from NASDAQ-100
def get_nasdaq100_tickers():
    url = "https://en.wikipedia.org/wiki/NASDAQ-100"
    soup = BeautifulSoup(requests.get(url).text, "lxml")
    table = soup.find("table", {"id": "constituents"})
    return [
        row.find_all("td")[0].text.strip()
        for row in table.find_all("tr")[1:]
        if len(row.find_all("td")) >= 1
    ]


#validate that these are real tickers
def is_valid_ticker(ticker):
    try:
        info = yf.Ticker(ticker).info
        return "shortName" in info
    except:
        return False

#pull all tickers from all of the above sources
sp500 = get_sp500_tickers()
dow = get_dow_tickers()
nasdaq = get_nasdaq100_tickers()

#combine and make tickers unique
all_tickers = sorted(set(sp500 + dow + nasdaq))

#data cleaning
all_tickers = [t.replace('.', '-') for t in all_tickers]


# **Insert yFinance Data into Snowflake Tables**


This code is designed to build 3 key financial datasets, a security master table, ESG scores, and 10 years of historical price performance, for a list of securities. Using the Yahoo Finance API, the script loops through each ticker and retrieves metadata like company name, sector,, industry, market cap, and trading exchanges. It also attemps to fetch ESG related metrics (if available) and daily price history over the past 10 years. All this information is stored in separate DataFrames. If a ticker fails to return valid metadata or historical price data, it is logged into a failed ticker list. Once the data is collected, the script standardizes data formatting and pushes each DataFrame directly to Snowflake using a merg strategy, avoiding duplicates based on defined unique keys. <br> <br>
Initally, the code included additional tables such as price snapshots, fundamentals, and analyst estimates. But, we decided to remove these from the pipeline because they aren't directly used in our target deliverable, the fund fact sheet. While they may be used in the broader portfolio analytics or internal risk assessments, they were out of scope for this specific task. <br>
The security master table is foundational to the pipeline, as it centralizes all core attributes about the securities in our dataset. It ensures consistency and enables future joins with holdings, ESG metrics, and price data. We also collect daily price history over a 10 year horizon to support fund level performance analysis, invluding quarter over quarter or year over year changes. Ideally, this time range should be extended further to reflect real world investment horizons more accurately. However, we limited the query to 10 years to avoid triggering API rate limits of blocks from Yahoo Finance. A long-term enhancement would be periodically refresh historical data and biild a more robust price history system over time. <br> <br>
While this current version is designed to run once and populate Snowflake, future iterations could introduce automated checks. Fo rexample, if a security appears in holdings but is missing from the security master, the system should automatically fetch and populate its metadata from Yahoo Finance. This would ensure the pipeline remains dynamic and scalable as fund compositions evolve.

In [None]:
import yfinance as yf
import datetime
import pandas as pd
import time

# Initialize tables
security_master = []
esg = []
performance = []
failed_tickers = []

# --- Pull 10 years of historical price data for each ticker ---
start_date = (datetime.datetime.today() - datetime.timedelta(days=365 * 10)).strftime('%Y-%m-%d')
end_date = datetime.datetime.today().strftime('%Y-%m-%d')

# Replace this with your real ticker list
# all_tickers = ['AAPL', 'TSLA', 'GOOG', 'INVALID1', 'INVALID2']
# Assume all_tickers is defined above

for symbol in all_tickers:
    try:
        t = yf.Ticker(symbol)
        info = t.info

        # Validate info is usable
        # Skip invalid tickers that have no 'shortName' (likely delisted or bad symbol)

        if not info or "shortName" not in info:
            print(f"No valid info for {symbol}")
            failed_tickers.append(symbol)
            continue

        # --- Security Master ---
        security_master.append({
            "symbol": symbol,
            "shortName": info.get("shortName"),
            "name": info.get("longName"),
            "sector": info.get("sector"),
            "industry": info.get("industry"),
            "exchange": info.get("exchange"),
            "currency": info.get("currency"),
            "country": info.get("country"),
            "market_cap": info.get("marketCap")
        })

        # --- ESG Data ---
        sustainability = t.sustainability
        if sustainability is not None and not sustainability.empty:
            row = sustainability.transpose()
            esg.append({
                "symbol": symbol,
                "esgPerformance": row.get("esgPerformance", {}).values[0] if "esgPerformance" in row else None,
                "totalEsg": row.get("totalEsg", {}).values[0] if "totalEsg" in row else None,
                "environmentScore": row.get("environmentScore", {}).values[0] if "environmentScore" in row else None,
                "socialScore": row.get("socialScore", {}).values[0] if "socialScore" in row else None,
                "governanceScore": row.get("governanceScore", {}).values[0] if "governanceScore" in row else None,
                "highestControversy": row.get("highestControversy", {}).values[0] if "highestControversy" in row else None
            })

        # --- Performance History ---
        hist = t.history(start=start_date, end=end_date)
        if hist.empty:
            print(f"No price history for {symbol}")
            failed_tickers.append(symbol)
            continue
        hist = hist.reset_index()
        hist["symbol"] = symbol
        performance.append(hist)

        # Optional: sleep to avoid hitting rate limits
        time.sleep(1)

    except Exception as e:
        print(f"Error with {symbol}: {e}")
        failed_tickers.append(symbol)

# Convert to DataFrames
df_security_master = pd.DataFrame(security_master)
df_esg = pd.DataFrame(esg)
df_performance = pd.concat(performance, ignore_index=True) if performance else pd.DataFrame()

# Output summary
print(f"\nFinished processing {len(all_tickers)} tickers.")
print(f"Total failed tickers: {len(failed_tickers)}")
print(failed_tickers)


ERROR:yfinance:HTTP Error 404: 
ERROR:yfinance:HTTP Error 404: 
ERROR:yfinance:HTTP Error 404: 
ERROR:yfinance:HTTP Error 404: 
ERROR:yfinance:HTTP Error 404: 
ERROR:yfinance:HTTP Error 404: 



Finished processing 517 tickers.
Total failed tickers: 0
[]


In [None]:
#convert table into a Dataframe
df_security_master = pd.DataFrame(security_master)
df_esg = pd.DataFrame(esg)
df_performance = pd.concat(performance) if performance else pd.DataFrame()

df_performance["Date"] = pd.to_datetime(df_performance["Date"], unit='ns').dt.date


In [None]:
#connect to snowflake and load the data directly bypassing any previously loaded data
conn = get_snowflake_connection()
load_to_snowflake_merge(df_security_master, "SECURITY_MASTER", conn, unique_keys=["symbol"])

conn = get_snowflake_connection()
load_to_snowflake_merge(df_esg, "ESG_STOCK_DATA", conn, unique_keys=["symbol"])

conn = get_snowflake_connection()
if not df_performance.empty:
    load_to_snowflake_merge(df_performance, "STOCK_PERFORMANCE_HISTORY", conn, unique_keys=["Date", "symbol"])


SECURITY_MASTER updated. Duplicates prevented using keys: ['symbol']
ESG_STOCK_DATA updated. Duplicates prevented using keys: ['symbol']


  write_pandas(conn, df, temp_table)


STOCK_PERFORMANCE_HISTORY updated. Duplicates prevented using keys: ['Date', 'symbol']


In [None]:
df_esg.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 511 entries, 0 to 510
Data columns (total 7 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   symbol              511 non-null    object 
 1   esgPerformance      511 non-null    object 
 2   totalEsg            511 non-null    float64
 3   environmentScore    509 non-null    float64
 4   socialScore         509 non-null    float64
 5   governanceScore     509 non-null    float64
 6   highestControversy  511 non-null    float64
dtypes: float64(5), object(2)
memory usage: 28.1+ KB


# yFinance Benchmark Indexes

theres no esg score for indexes
chose one index for each fund focus.
initally hardcoded some column fields but in order to make it repeatable and applied to other areas then did a config. also maybe reference currency info from the currency table instead of currency_full_name.

add this information into the 2-3 tables that are available for benchmarks.

make its own table in snowflake. do a join between benchmark tables and

In [8]:
import yfinance as yf
import pandas as pd
from datetime import datetime

# ------------------- CONFIG -------------------
#update as benchmarks are chosen, added on
benchmark_map = {
    "ENRG": "Environment_ENRG",
    "SHE":  "Social_SHE",
    "VOTE": "Governance_VOTE",
    "ESGD": "LowControversy_ESGD",
    "EFIV": "TotalESG_EFIV"
}

price_field = "Close"
# map price field → PERFORMANCEDATATYPE
price2datatype = {
    "Close":     "EOD Price",
    "Adj Close": "EOD Price",
    "Open":      "BOD Price",
    "High":      "HIGH Price",
    "Low":       "LOW Price"
}

# Optional lookup to turn currency codes into full names
currency_full_name = {
    "USD": "US Dollar",
    "EUR": "Euro",
    "GBP": "British Pound",
    "CAD": "Canadian Dollar",
    "JPY": "Japanese Yen"
}

# ------------------- DATE RANGE -------------------
end   = datetime.today()
start = datetime(end.year - 10, end.month, end.day)

performance_records = []
general_info_records = []

for symbol, category in benchmark_map.items():
    print(f"Downloading: {symbol}")
    etf = yf.Ticker(symbol)

    # Pull 10-year history 📈
    hist = etf.history(start=start, end=end)

    # Get currency from yfinance; default to USD if missing
    cur_code = etf.info.get("currency", "USD")
    cur_name = currency_full_name.get(cur_code, cur_code)

    perf_datatype = price2datatype.get(price_field, "EOD")  # fallback to EOD

    # --- PERFORMANCE rows ---
    for date, row in hist.iterrows():
        performance_records.append({
            "BENCHMARKCODE":            category.upper(),
            "PERFORMANCEDATATYPE":      perf_datatype,
            "CURRENCYCODE":             cur_code,
            "CURRENCY":                 cur_name,
            "PERFORMANCEFREQUENCY":     "DAILY",
            "VALUE":                    round(row[price_field], 4),
            "HISTORYDATE1":             date.date(),
            "HISTORYDATE":              pd.to_datetime(date)
        })

    # --- GENERAL INFO rows ---
    general_info_records.append({
        "BENCHMARKCODE":            category.upper(),
        "SYMBOL":                   symbol,
        "NAME":                     etf.info.get("longName", f"{symbol} ETF"),
        "ISBEGINOFDAYPERFORMANCE":  perf_datatype == "BOD"
    })

# ------------------- DATAFRAMES -------------------
df_perf   = pd.DataFrame(performance_records)
df_general = pd.DataFrame(general_info_records)

print(df_perf.head())
print(df_general)

# ------------------- SAVE CSVs -------------------
df_perf.to_csv("benchmark_performance.csv", index=False)
df_general.to_csv("benchmark_general_info.csv", index=False)


Downloading: ENRG
Downloading: SHE
Downloading: VOTE
Downloading: ESGD
Downloading: EFIV
      BENCHMARKCODE PERFORMANCEDATATYPE CURRENCYCODE   CURRENCY  \
0  ENVIRONMENT_ENRG                 EOD          USD  US Dollar   
1        SOCIAL_SHE                 EOD          USD  US Dollar   
2        SOCIAL_SHE                 EOD          USD  US Dollar   
3        SOCIAL_SHE                 EOD          USD  US Dollar   
4        SOCIAL_SHE                 EOD          USD  US Dollar   

  PERFORMANCEFREQUENCY    VALUE HISTORYDATE1               HISTORYDATE  
0                DAILY  25.0000   2025-01-06 2025-01-06 00:00:00-05:00  
1                DAILY  45.0149   2016-03-08 2016-03-08 00:00:00-05:00  
2                DAILY  44.9921   2016-03-09 2016-03-09 00:00:00-05:00  
3                DAILY  44.9807   2016-03-10 2016-03-10 00:00:00-05:00  
4                DAILY  45.7225   2016-03-11 2016-03-11 00:00:00-05:00  
         BENCHMARKCODE SYMBOL                                NAME  \
0

In [None]:
#connect to snowflake and load the data directly bypassing any previously loaded data
conn = get_snowflake_connection()
load_to_snowflake_merge(df_perf, "SECURITY_MASTER", conn, unique_keys=["symbol"])