In [None]:
import pdfplumber
import pandas as pd
import re
import yfinance as yf
import os
import datetime
import numpy as np
import pandas_market_calendars as mcal

def extract_financial_statements(pdf_path):
    # Extract full text from PDF
    full_text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page_num, page in enumerate(pdf.pages, start=1):
            lines = page.extract_text().split('\n')
            full_text += f"\n--- Page {page_num} ---\n" + "\n".join(lines)

    # Extract text between two section markers
    def extract_section(text, start_marker, end_marker=None):
        pattern = rf"{start_marker}(.*?){end_marker if end_marker else '--- Page'}"
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        return match.group(1).strip() if match else ""

    # Convert section text into a DataFrame
    def text_to_dataframe(section_text, section_name=""):
        section_text = re.sub(r'\(\d+\)', '', section_text)
        lines = section_text.strip().split("\n")
        lines = [line.strip() for line in lines if line.strip() and not line.startswith("(")]

        if not lines:
            print(f"No lines found in section: {section_name}")
            return pd.DataFrame()
        
        # Get headers
        header_line = lines[0]
        headers = re.findall(r'(Q[1-4]-\d{4}|\d{2}-[A-Za-z]{3}-\d{2}|\d{4})', header_line)
        if not headers:
            headers = header_line.split()[-5:]  # fallback
        column_names = ['Item'] + headers
        data = []

        # Extract values and labels
        for line in lines[1:]:
            values = re.findall(r'[-−(]?\$?\(?\d[\d,\.]*\)?|—', line)
            if not values:
                continue
            value_start = line.find(values[0])
            label = line[:value_start].strip()
            cleaned_values = [v.replace(',', '').replace('−', '-').replace('$', '') for v in values]
            row = [label] + cleaned_values
            data.append(row)

        # Normalise row lengths
        for row in data:
            if len(row) < len(column_names):
                row += [""] * (len(column_names) - len(row))
            elif len(row) > len(column_names):
                row = row[:len(column_names)]

        try:
            df = pd.DataFrame(data, columns=column_names)
            df = df.iloc[:-1]
            df.columns.values[0] = "Item"
            return df
        except Exception as e:
            print(f"Failed to create DataFrame for section: {section_name}")
            raise e

    # Extract and convert each section
    statement_ops_text = extract_section(full_text, "S T A T E M E N T O F O P E R A T I O N S", "--- Page")
    balance_sheet_text = extract_section(full_text, "B A L A N C E S H E E T", "--- Page")
    cash_flow_text = extract_section(full_text, "S T A T E M E N T O F C A S H F L O W S", "--- Page")

    df_statement_ops = text_to_dataframe(statement_ops_text, "Statement of Operations")
    df_balance_sheet = text_to_dataframe(balance_sheet_text, "Balance Sheet")
    df_cash_flows = text_to_dataframe(cash_flow_text, "Cash Flow Statement")

    return df_statement_ops, df_balance_sheet, df_cash_flows

In [None]:
def extract_financial_summary(pdf_path):
    # Extract full text from the pdf
    full_text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page_num, page in enumerate(pdf.pages, start=1):
            lines = page.extract_text().split('\n')
            full_text += f"\n--- Page {page_num} ---\n" + "\n".join(lines)

    # Extract section between markers
    def extract_section(text, start_marker, end_marker=None):
        pattern = rf"{start_marker}(.*?){end_marker if end_marker else '--- Page'}"
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        return match.group(1).strip() if match else ""

    # Convert extracted section into a dataframe
    def text_to_dataframe(section_text, section_name=""):
        section_text = re.sub(r'\(\d+\)', '', section_text)  # Remove footnote references
        lines = section_text.split("\n")

        if not lines or len(lines) < 2:
            return pd.DataFrame()

        headers = lines[1]

        # Extract header phrase and date labels
        header_parts = re.split(r'(\sQ[1-4]-\d{4}|\sYoY)', headers)
        header_parts = [part.strip() for part in header_parts if part.strip()]

        first_phrase = ' '.join(header_parts[0].split())
        date_labels = header_parts[1:]
        column_names = [first_phrase] + date_labels

        data = []

        # Extract rows
        for line in lines:
            values = re.findall(r'[-−(]?\$?\(?\d[\d,\.]*\)?|—', line)
            if not values:
                continue

            value_start = line.find(values[0])
            label = line[:value_start].strip()
            cleaned_values = [v.replace(',', '').replace('−', '-').replace('$', '').strip() for v in values]
            row = [label] + cleaned_values[:len(column_names) - 1]
            data.append(row)

        # Normalize row lengths
        for row in data:
            if len(row) < len(column_names):
                row += [""] * (len(column_names) - len(row))
            elif len(row) > len(column_names):
                row = row[:len(column_names)]

        try:
            return pd.DataFrame(data, columns=column_names)
        except Exception as e:
            raise e

    # Extract and convert the "FINANCIAL SUMMARY" section
    summary_text = extract_section(full_text, "F I N A N C I A L S U M M A R Y", "--- Page")
    df_financial_summary = text_to_dataframe(summary_text, "Financial Summary")

    # Clean up the dataframe
    if not df_financial_summary.empty:
        df_financial_summary = df_financial_summary.drop(0).reset_index(drop=True)
        df_financial_summary = df_financial_summary.iloc[:-1]  # remove last row
        df_financial_summary.columns.values[0] = "Item"

    return df_financial_summary

In [None]:
ticker = yf.Ticker("TSLA")

quarter_ends = {
    "Q4-2021": "2021-12-31",
    "Q1-2022": "2022-03-31",
    "Q2-2022": "2022-06-30",
    "Q3-2022": "2022-09-30"
}

history = ticker.history(start="2021-10-01", end="2022-10-01")

# Extract closing prices for quarter-end dates
closing_prices = {}
for quarter, date in quarter_ends.items():
    # Some dates may fall on weekends, so get the nearest previous trading day
    while date not in history.index.strftime("%Y-%m-%d"):
        date = (pd.to_datetime(date) - pd.Timedelta(days=1)).strftime("%Y-%m-%d")
    closing_prices[quarter] = round(history.loc[date]["Close"], 2)

In [201]:
def get_value_by_label(dataset, keyword1, period, keyword2=None):
    keywords = [keyword1]
    if keyword2:
        keywords.append(keyword2)
    
    for keyword in keywords:
        match = dataset[dataset["Item"].str.strip().str.lower() == keyword.strip().lower()]
        if not match.empty:
            return float(match[period].values[0])
    
    raise ValueError(f"Item exactly matching '{keyword1}'" + (f" or '{keyword2}'" if keyword2 else "") + " not found.")

def get_EPS(dataset, period):

    eps = get_value_by_label(dataset, "EPS attributable to common stockholders, diluted (non-GAAP)", period)
    eps = float(eps)

    if period in pre_split_quarters:
        return eps/3
    else:
        return eps

def get_Revenue_Growth(dataset, period1, period2):
    curr_rev = get_value_by_label(dataset, "Total revenues", period2)
    past_rev = get_value_by_label(dataset, "Total revenues", period1)

    curr_rev = float(curr_rev)
    past_rev = float(past_rev)

    growth = (curr_rev - past_rev) / past_rev * 100

    return growth

def get_Gross_Margin(dataset, period):
    gross_profit = get_value_by_label(dataset, "Total gross profit", period)
    curr_rev = get_value_by_label(dataset, "Total revenues", period)

    gross_profit = float(gross_profit)
    curr_rev = float(curr_rev)

    gross_margin = (gross_profit / curr_rev) * 100

    return gross_margin

def get_PE_Ratio(dataset, period, share_prices):
    share_price = share_prices.get(period)
    eps = get_EPS(dataset, period)

    share_price = float(share_price)

    pe = share_price / eps

    return pe

def get_ROE(dataset1, dataset2, period1, period2):
    # dataset1 should be financial summary
    # dataset2 should be balance sheet
    # period1 should be Q4-2021
    # period2 should be 31-Dec-21

    net_income = get_value_by_label(dataset1, "Net income attributable to common stockholders (non-GAAP)", period1)
    total_shareholder_equity = get_value_by_label(dataset2, "Total stockholders' equity", period2)

    net_income = float(net_income)
    total_shareholder_equity = float(total_shareholder_equity)

    roe = (net_income / total_shareholder_equity) * 100

    return roe

def get_DE(dataset, period):

    total_liabilities = float(get_value_by_label(dataset, "Total liabilities and equity", period)) - float(get_value_by_label(dataset, "Total stockholders' equity", period))

    total_shareholder_equity = get_value_by_label(dataset, "Total stockholders' equity", period)

    total_shareholder_equity = float(total_shareholder_equity)
    total_liabilities = float(total_liabilities)

    de = total_liabilities / total_shareholder_equity

    return de

def get_BS(dataset1, dataset2, period1, period2):
    # dataset1 should be balance sheet
    # detaset2 should be statement of operations

    total_shareholder_equity = get_value_by_label(dataset1, "Total stockholders' equity", period2)

    weighted_average_shares = get_value_by_label(dataset2, "Diluted", period1)

    total_shareholder_equity = float(total_shareholder_equity)
    weighted_average_shares = float(weighted_average_shares)

    if period1 in pre_split_quarters:
        bs = total_shareholder_equity / (weighted_average_shares * 3)
    else:
        bs = total_shareholder_equity / weighted_average_shares

    return bs

def get_PB(dataset1, dataset2, period1, period2, share_prices):
    share_price = share_prices.get(period1)
    bs = get_BS(dataset1, dataset2, period1, period2)

    pb = share_price / bs

    return pb

pre_split_quarters = ["Q4-2021", "Q1-2022", "Q2-2022"]

quarter_to_date = {
    "Q4-2021": "31-Dec-21",
    "Q1-2022": "31-Mar-22",
    "Q2-2022": "30-Jun-22",
    "Q3-2022": "30-Sep-22",
    "Q4-2022": "31-Dec-22"
}

In [None]:
# Set start dates
start_date = "2021-10-01"
end_date = "2022-09-30"

# Set paths
output_path = "/Users/tanyikchen/Library/CloudStorage/OneDrive-UniversityofNottinghamMalaysia/y3/DIA/Datasets/TSLA_FA.csv"

pdf_folder = "/Users/tanyikchen/Library/CloudStorage/OneDrive-UniversityofNottinghamMalaysia/y3/DIA/Annual Reports/TSLA"
pdf_files = [
    "TSLA-Q4-2021-Update.pdf",
    "TSLA-Q1-2022-Update.pdf",
    "TSLA-Q2-2022-Update.pdf"
]

quarter_to_date = {
    "Q4-2021": "31-Dec-21",
    "Q1-2022": "31-Mar-22",
    "Q2-2022": "30-Jun-22"
}

# Forming date dataframe
trading_days = pd.date_range(start=start_date, end=end_date, freq="B")
daily_df = pd.DataFrame({"Date": trading_days})
tsla_prices = yf.download("TSLA", start=start_date, end=end_date)

# Loop through files in folder
fa_data = []

for pdf_file in pdf_files:
    quarter = pdf_file.split("-")[1] + "-" + pdf_file.split("-")[2]
    report_date = quarter_to_date[quarter]
    pdf_path = os.path.join(pdf_folder, pdf_file)

    # Extract tables
    df_statement_ops, df_balance_sheet, df_cash_flows = extract_financial_statements(pdf_path)
    df_financial_summary = extract_financial_summary(pdf_path)

    # Parse metadata
    q_label, year = quarter.split("-")
    last_year = str(int(year) - 1)
    same_quarter_last_year = f"{q_label}-{last_year}"
    effective_date = pd.to_datetime(f"{year}-{'01' if q_label == 'Q1' else '04' if q_label == 'Q2' else '07' if q_label == 'Q3' else '10'}-01")

    # Compute fundamental indicators
    eps = get_EPS(df_financial_summary, quarter)
    rev_growth = get_Revenue_Growth(df_financial_summary, same_quarter_last_year, quarter)
    gross_margin = get_Gross_Margin(df_financial_summary, quarter)
    pe = get_PE_Ratio(df_financial_summary, quarter, closing_prices)
    roe = get_ROE(df_financial_summary, df_balance_sheet, quarter, report_date)
    de = get_DE(df_balance_sheet, report_date)
    pb = get_PB(df_balance_sheet, df_statement_ops, quarter, report_date, closing_prices)
    bs = get_BS(df_balance_sheet, df_statement_ops, quarter, report_date)

    fa_data.append({
        "Quarter": quarter,
        "Effective_Date": effective_date,
        "EPS": eps,
        "Revenue_Growth": rev_growth,
        "Gross_Margin": gross_margin,
        "P/E": pe,
        "ROE": roe,
        "D/E": de,
        "P/B": pb,
        "B/S": bs
    })

# Construct time series dataframe
fa_df = pd.DataFrame(fa_data).sort_values("Effective_Date")
daily_df["Effective_Date"] = daily_df["Date"].apply(
    lambda x: fa_df[fa_df["Effective_Date"] <= x]["Effective_Date"].max()
)
final_df = pd.merge(daily_df, fa_df, on="Effective_Date", how="left").drop(columns=["Effective_Date", "Quarter"])
final_df["Date"] = final_df["Date"].dt.date

# Pre-calculated values for Q3 2022
q3_fundamentals = {
    "EPS": 1.05,
    "Revenue_Growth": 55.94969834,
    "Gross_Margin": 25.08623101,
    "P/E": 252.6190476,
    "ROE": 9.169155103,
    "D/E": 0.867606835,
    "P/B": 23.08315977,
    "B/S": 11.49106113
}

q3_effective_date = pd.to_datetime("2022-07-01")
fa_df = pd.concat([
    fa_df,
    pd.DataFrame([{
        "Quarter": "Q3-2022",
        "Effective_Date": q3_effective_date,
        **q3_fundamentals
    }])
]).sort_values("Effective_Date").reset_index(drop=True)

q3_mask = (final_df["Date"] >= datetime.date(2022, 7, 1)) & (final_df["Date"] <= datetime.date(2022, 9, 30))
for col, val in q3_fundamentals.items():
    final_df.loc[q3_mask, col] = val

# Get the NYSE calendar
nyse = mcal.get_calendar("NYSE")
trading_days = nyse.schedule(start_date=start_date, end_date=end_date)
trading_dates = trading_days.index.normalize()  # Normalize to remove time part

# Create time series dataframe based on NYSE calendar
expanded_rows = []

# Iterate through each trading date to build the time series
for date in trading_dates:
    # Find the corresponding financial data based on the closest effective date before the trading date
    effective_row = fa_df[fa_df["Effective_Date"] <= date].iloc[-1]
    
    # Create a row for this trading date, combining the date with the financial data
    row = {"Date": date}
    row.update({k: v for k, v in effective_row.items() if k not in ["Effective_Date", "Quarter"]})
    
    expanded_rows.append(row)

# Build final dataframe
fundamental_df = pd.DataFrame(expanded_rows)
fundamental_df = fundamental_df.sort_values("Date").reset_index(drop=True)

# Output and export
fundamental_df.to_csv(output_path, index=False)
print(fundamental_df)

[*********************100%***********************]  1 of 1 completed


          Date       EPS  Revenue_Growth  Gross_Margin         P/E       ROE  \
0   2021-10-01  0.846667       64.919955     27.354817  416.055118  9.536586   
1   2021-10-04  0.846667       64.919955     27.354817  416.055118  9.536586   
2   2021-10-05  0.846667       64.919955     27.354817  416.055118  9.536586   
3   2021-10-06  0.846667       64.919955     27.354817  416.055118  9.536586   
4   2021-10-07  0.846667       64.919955     27.354817  416.055118  9.536586   
..         ...       ...             ...           ...         ...       ...   
247 2022-09-26  1.050000       55.949698     25.086231  252.619048  9.169155   
248 2022-09-27  1.050000       55.949698     25.086231  252.619048  9.169155   
249 2022-09-28  1.050000       55.949698     25.086231  252.619048  9.169155   
250 2022-09-29  1.050000       55.949698     25.086231  252.619048  9.169155   
251 2022-09-30  1.050000       55.949698     25.086231  252.619048  9.169155   

          D/E        P/B        B/S  
0