### TO-DO

#### Navigate through comp_facts dictionary locating desired value
#### Improve table extraction procedure from filings

### Set Up

In [1]:
import ealib
import logging
import pandas as pd
from collections import Counter
from typing import List
import random
from typing import Tuple
import yfinance as yf
import matplotlib.pyplot as plt
import numpy as np
from bs4 import BeautifulSoup
from typing import Callable
import os

# Required to identify with EDGAR API
req_header = {"User-Agent": "roberto.brera.24@outlook.com"}

# Select rate of requests (< 10)
mrps = 8

# Select desired logging level
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)

# Stores information about all tickers currently available on SEC database
tickers_df = ealib.get_tickers_df(req_header)

2024-07-26 16:40:18,094 - INFO - Request to https://www.sec.gov/files/company_tickers.json returned successfully. Response code: 200


### Utility Scripts

#### Select Ticker

In [None]:
query_substr =  "microvis"

query_ticker = ealib.find_title_substring(tickers_df, query_substr).iloc[0]
comp_str = f'{query_ticker["title"]} ({query_ticker["ticker"]}, Index {query_ticker.name})'

# Retrieve main dictionaries required for this ticker
comp_mtd = ealib.get_response_dict(ealib.metadata_url(query_ticker["cik_str"]), req_header, mrps)
if comp_mtd is None:
    logging.warning(f'Failed to retrieve company metadata for ticker {query_ticker["ticker"]}')
comp_facts = ealib.get_response_dict(ealib.companyfacts_url(query_ticker["cik_str"]), req_header, mrps)
if comp_facts is None:
    logging.warning(f'Failed to retrieve company facts for ticker {query_ticker["ticker"]}')

query_ticker

#### Filter by select string contained in select filings

In [2]:
def count_query_str_in_sel_doc(comp_mtd: dict, ticker: pd.DataFrame, query_forms: List[str], query_str: str, req_header, mrps) -> int:

    # Fetch the text of latest 10-K or 20-F
    select_filings=ealib.filter_filings(
        pd.DataFrame.from_dict(comp_mtd["filings"]["recent"]), 
        "filingDate", "form", 
        query_forms=query_forms, 
        max_days=2*360, # Guarantees a full year
        min_days=0*360
    )

    # Check if we have a valid dataframe, or if no filings available
    if select_filings is None or select_filings.empty or select_filings.shape[0] == 0:
        return -1

    curr_doc = select_filings.iloc[0, :] # select first descriptive filing

    response = ealib.requests_get_wrp(
        ealib.doc_url(
            ticker.get("cik_str", ""),
            curr_doc["accessionNumber"].replace("-",""),
            curr_doc["primaryDocument"]
        ),
        req_header,
        mrps=mrps
    )

    return response.text.lower().count(query_str.lower())


In [None]:
# Search company fact with refined query string 
query_forms = ["10-k", "20-f"]
query_str = " lidar "
out_file = "matches.txt"
res_df = pd.DataFrame(columns=tickers_df.iloc[0].keys())
qso = Counter()

for index, curr_ticker in tickers_df.iterrows():
    # Retrieve company data for curr_ticker
    comp_mtd = ealib.get_response_dict(ealib.metadata_url(curr_ticker["cik_str"]), req_header, mrps)
    if comp_mtd is None:
        logging.warning(f'Failed to retrieve company metadata for ticker {query_ticker["ticker"]}')
        qso[f'-1'] += 1
        continue

    count = count_query_str_in_sel_doc(comp_mtd, curr_ticker, query_forms, query_str, req_header, mrps)
    
    if count > 0:
        print(f"Found matching form with count {count}")
        res_df.loc[len(res_df)] = curr_ticker
        with open(out_file, 'a') as file:
            # Append the text to the file
            file.write(f'{curr_ticker["ticker"]}\n')  # Adding a newline character for formatting

    qso[f'{count}'] += 1


print(qso)
res_df


In [None]:
res_df = pd.read_excel("lidar_screening.xlsx")

res_df = res_df.drop(columns="Unnamed: 0")

# Use .apply to generate new columns

res_df["Market Cap (USD)"] = res_df["ticker"].apply(
    lambda ticker: ealib.yf_info(ticker, "marketCap") * ealib.exch_rate(ealib.yf_info(ticker, "currency"), "USD")
)

res_df["Last price (USD)"] = res_df["ticker"].apply(
    lambda ticker: yf.Ticker(ticker).history(period="1d")["Close"].iloc[-1] * ealib.exch_rate(ealib.yf_info(ticker, "currency"), "USD")
)

res_df.to_excel("lidar and ADAS screenings.xlsx")

#### Download Select Filings

In [None]:
ealib.download_company_filings(
    req_header, mrps, 
    comp_dir="FuboTV 10-Q", 
    select_filings=ealib.filter_filings(
        pd.DataFrame.from_dict(comp_mtd["filings"]["recent"]), 
        "filingDate", "form", 

        query_forms=["10"], 
        max_days=2*360,
        min_days=0*360
        
        ),
    ticker=query_ticker, 
    write_txt=False, 
    write_pdf=True
)

#### Extracting tabular data from downloaded html files

##### Pre-processing html with bs (not efficient)

In [None]:
# First identify all the tags and tag attributes present in teh file
filename =  "Cresud 20-F htmls/20-F/2024.txt"

with open(filename, 'r') as file:
    input_html = file.read()

soup = BeautifulSoup(input_html, 'html.parser')

unique_tag_names = set()
unique_tag_attrs = set()  # Use a set to store attribute names

for tag in soup.findAll(True):
    unique_tag_names.add(tag.name)  # Collecting unique tag names
    unique_tag_attrs.update(tag.attrs.keys())  # Adding attribute names

# Print unique tag names and attributes
print("Unique Tag Names:", unique_tag_names)
print("Unique Attributes Used in Tags:", unique_tag_attrs)

In [None]:
# Then identify tags/ attributes thatw e want to remove
tags_to_unwrap = ['span', 'div', 'a', 'br', 'img']
tags_to_remove = ['meta', 'title', 'link', 'script', 'style', 'head', 'body', 'html', 'ix:nonfraction', 'ix:nonnumeric']
attributes_to_remove = ['style', 'class', 'id', 'name', 'xmlns:xbrli', 'xmlns:xsi', 'xlink:href', 'colspan', 'rowspan', 'href', 'src', 'alt', 'title', 'scheme', 'arcrole', 'role', 'type', 'xmlns', 'xml:lang']

In [None]:

# Parse and clean up the html from unnecessary tags, storing to separate file
example_file_path = "html_table_example.html"
cleaned_html_filepath = 'cleaned_table.html'

# SImplified lists
tags_to_unwrap = ['span']
attributes_to_remove = ['style']
tags_to_remove = []

with open(example_file_path, 'r') as file:
    full_html = file.read()

soup = BeautifulSoup(full_html, 'html.parser')

# Removing unnecessary attributes from all tags
for tag in soup.findAll(True):
    # Check if the tag itself should be completely removed
    if tag.name in tags_to_remove:
        tag.decompose()  # Removes the tag and its contents completely from the soup
    else:
        # Clean up attributes
        tag.attrs = {key: value for key, value in tag.attrs.items() if key not in attributes_to_remove}

# Unwrap specified tags
for tag_name in tags_to_unwrap:
    for tag in soup.find_all(tag_name):
        tag.unwrap()

# Assume soup has been initialized and loaded with HTML content
for tr in soup.find_all('tr'):
    # Iterate over each <td> in the row
    for td in tr.find_all('td'):
        # Check if <td> is empty: no text and no attributes
        if not td.get_text(strip=True) and not td.attrs:
            # Remove the empty <td>
            td.decompose()

# Convert cleaned HTML to string
clean_html = str(soup)

# Save the cleaned HTML
with open(cleaned_html_filepath, 'w') as file:
    file.write(clean_html)


In [None]:
def subcolumns(df: pd.DataFrame, col: pd.Series) -> List:
    """
    Returns a mask, where False means that a column in the dataframe is 
    a "subcolumn" of col, meaning that it contains the same data but more NaN values
    """
    mask = []
    for index, dcol in df.items():
        is_equal = col.isna() | dcol.isna() | (col == dcol)
        mask.append(~((is_equal == 1).all()) or col.isna().sum() >= dcol.isna().sum())
    return mask

def discard_subcolumns(df : pd.DataFrame) -> pd.DataFrame:
    """ 
    Applies the subcolumns function on each column of the dataframe and returns the resulting dataframe
    """
    cum_mask = [True for _ in range(len(df.columns))] # initialize uniformly to True
    for index, col in df.items():
        mask = subcolumns(df, col)
        cum_mask = [cum_mask[i] and mask[i] for i in range(len(cum_mask))] # vectorized AND operation
    return df.loc[:, cum_mask]

def clean_cols(df: pd.DataFrame) -> pd.DataFrame:
    """ (1) Remove fully NaN rows and columns """
    res = df.dropna(how='all').dropna(axis=1, how='all')

    """ (2) Remove duplicate columns """
    df_transposed = res.T
    df_transposed = df_transposed[~df_transposed.duplicated()]
    res = df_transposed.T

    """ (3) Discard NaN subcolumns """
    res = discard_subcolumns(res)

    """ (4) TODO: Convert Excel-style numbering to interpretable numbers """
    res = res.map(lambda x: x.replace('—', '0') if isinstance(x, str) else x)  # Replace dashes with zero (assumes dashes mean zero)
    res = res.map(lambda x: pd.to_numeric(x.replace(',', '').replace('(', '-').replace(')', '') if isinstance(x, str) else x, errors="ignore")) # TODO: Handle errors explicitly (warning)

    """ TODO: other preprocessing steps """
    return res

def df_contains_substr(df: pd.DataFrame, query_str: str) -> bool:
    return  df.map(lambda x: query_str.lower() in str(x).lower()).any().any()




##### Extract tables with pd.read_html()

In [None]:
""" Extract tables from a list of files """
filenames: str =  [
    #f"AdecoAgro 20-F html files/20-F/202{i}.html" for i in range(5)
    "BrasilAgro 20-F html/20-F/2024.txt"
]

tables : List[pd.DataFrame] = []

for file in filenames:
    tables = tables + pd.read_html(file)

len(tables)

##### Filter tables based on query string containment, then apply cleaning funcs

In [None]:
# First query into raw table text
queries: List[str] = [
    "Jato", "Xingu", "Nova", "ETH"
]
q2: List[str] = [
    "Regalito", "Unagro", "Panamby"
]


query_tables = [df for df in tables if df_contains_substr_all(df, queries) or df_contains_substr_all(df, q2)]

# Then clean smaller list of resulting tables
cleaned_tables = clean_tabs(query_tables, clean_cols)
len(cleaned_tables)

##### Indexing the tables (multi-indexing)

In [None]:
# Survery the structure of the tables
row_counter = Counter()
col_counter = Counter()
for tab in cleaned_tables:
    row_counter[f'{tab.shape[0]}'] += 1
    col_counter[f'{tab.shape[1]}'] += 1

print(f'Row Counter: {row_counter}')
print(f'Col Counter: {col_counter}')
    

In [None]:
table_num = random.randint(0, len(cleaned_tables) - 1)
print(table_num)

ex_tab = cleaned_tables[table_num]
ex_tab

In [None]:
# Check dates
ex_tab.map(lambda val: pd.to_datetime(val, errors='coerce'))

In [None]:
""" Assume first rows are headers for a hierarchical index """
""" Proceed until NOT number OR datetime to dynamically find col_layers"""
col_layers = 2

""" Fix column headers """
header = pd.MultiIndex.from_arrays([ex_tab.iloc[i] for i in range(col_layers)])
ex_tab.columns = header
ex_tab = ex_tab.drop(ex_tab.index[0:2])

# TO-DO: find way to drop nans from headers/columns safely
ex_tab

In [None]:
""" Set (row) index to first column """
ex_tab.set_index(ex_tab.columns[0], inplace=True)

In [None]:
ex_tab

In [None]:
ex_tab["Farming", "Rice"]

##### Other processing

In [None]:
# Concatenate with index
# TODO: Improve this very basic refinement method
output_tabs : List[pd.DataFrame] = []
for query_tab in query_tables:
    output_tabs.append(query_tab.iloc[:2, :])
    output_tabs.append(query_tab[query_tab[0].str.contains(queries[0], na=False)])

# Save to Excel
pd.concat(output_tabs).to_excel("EBITDA_adj.xlsx")

##### Selecting desired columns from list of dataframes, and saving each one to excel

In [None]:
col = 1
cleaned_tables[col].head()

In [None]:
# Start displaying, and selecting desired columns

desired_cols=[
    [0, 2, 4, 7, 11, 14, 16], 
    [0, 2, 4, 7, 11, 14, 16],
]

# Preview for a select table
cleaned_tables[col].loc[:, desired_cols[col] ]

In [None]:
# Save to Excel
with pd.ExcelWriter('brasilagro- land segments.xlsx', engine='openpyxl') as writer:
    for i, df in enumerate(cleaned_tables):
        # Write each DataFrame to a separate sheet, naming sheets as 'Sheet1', 'Sheet2', etc.
        df.loc[:, desired_cols[i]].to_excel(writer, sheet_name=f'Sheet{i+1}')

In [None]:
filepath = "html_tests/html_table_example.html"

table = pd.read_html(filepath)[0]

clean_table = clean_cols(table)
clean_table

##### Further refining methods

In [None]:
# Reindex columns according to the date row
date_row = 1

# Store only first column associated with each date 
# TODO: Handle this more robustly: we are not guaranteed that the first column encountered will carry the data!
# if more columns have the same desired column header, keep only the one column with least NaN values
example_table = example_table.loc[:, ~example_table.iloc[date_row].duplicated()]

# Reset column headers to desired one
example_table.columns = example_table.iloc[date_row]
example_table = example_table.drop(example_table.index[:date_row+1])

example_table

In [None]:
# Move production up and delete Nan value, reindex
# TODO: Generalize this operation as a row re-index, analogously to the column above
nan_col = example_table.columns[0]
repl_val = example_table[nan_col].iloc[0]

copy_column_headers = example_table.columns.to_list()
copy_column_headers[0] = repl_val
example_table.columns = copy_column_headers

# TODO: Hardcoded: either write method to recognize such rows, or just keep them as units (would not be a problem!)
example_table = example_table.drop(example_table.index[0]) # drop (in tons) (in tons) etc. row
example_table = example_table.set_index(repl_val) 
example_table.index.name = None  # Remove index name for better displaying
example_table


In [None]:
# Looking for datetimes
example_table.applymap(lambda val: pd.to_datetime(val, errors='coerce'))

#### Obtaining a company fact using yf

In [None]:
# Start by making yf ticker obejct
curr_yticker = yf.Ticker(query_ticker["ticker"])

In [None]:
# Search for particular info keys using search functions
ealib.find_dict_key_substr(curr_yticker.info, ["currency"])
ealib.find_dict_key_substr(curr_yticker.info, ["cap"])
ealib.find_keys_containing_all_substrs(curr_yticker.info, ["cash", "operating"])

In [None]:
# Use encapsulated function to search safely for the company fact, returning None if not found
ealib.yf_info(query_ticker["ticker"], "marketCap")
ealib.yf_info(query_ticker["ticker"], "currency")
ealib.yf_info(query_ticker["ticker"], "operatingCashflow")

In [None]:
# Generate info series for a df of tickers (can be directly stored on source df)
marketCap_series = tickers_df[:100]["ticker"].apply(lambda x: ealib.yf_info(x, "marketCap"))
marketCap_series

##### Historical Series for company fact

In [None]:
# Start with historical series for shares outstanding

start_year: int = 2018
end_year: int   = 2023

stock_data = yf.download(query_ticker["ticker"], start=f"{start_year}-01-01", end=f"{end_year}-12-31")
shares_out = ealib.yf_info(query_ticker["ticker"], "sharesOutstanding")

shares_data = pd.DataFrame(index=stock_data.index)
shares_out

In [None]:
ticker: str = query_ticker["ticker"]
data: str = "marketCap"

# Fetch historical data for the given period
data = yf.download(ticker, start=f"{start_year}-01-01", end=f"{end_year}-12-31")

# Resample the data to yearly frequency and calculate the mean exchange rate for each year
yearly_data = data['Close'].resample('Y').mean()

# Convert to DataFrame
yearly_avg_df = yearly_data.reset_index()
yearly_avg_df.columns = ['Year', f'Average Exchange Rate ({from_currency}/{to_currency})']

# Display the DataFrame
yearly_avg_df

#### Saved Refined Query Strings for Company Facts

In [None]:
shares_outstanding_query_str = ["NumberOfSharesOutstanding", "EntityCommonStockSharesOutstanding", "CommonStockSharesOutstanding"]
ocf_query_str = ["NetCashProvidedByUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"]
# To use with sufficient = True

#### Interactive Company Fact Search

In [None]:
### Company facts notes
""" 
"NoncurrentPortionOfNoncurrentNotesAndDebenturesIssued"
"CurrentNotesAndDebenturesIssuedAndCurrentPortionOfNoncurrentNotesAndDebenturesIssued"
"""

In [None]:
print_all_comp_fact_df = False
display_short_comp_fact_df = True

# If you do not yet have query str -> Broad search
query_fact_substr = ["cash", "equiv"]
sufficient = False

# If you have query str -> Refined search 
"""
query_fact_substr = ["NumberOfSharesOutstanding", "EntityCommonStockSharesOutstanding", "CommonStockSharesOutstanding"] # Refined list
sufficient = True
"""

# Navigate company facts dictionary and find all matches to query company fact
res = ealib.comp_facts_df(
    comp_facts,
    query_fact_substr, 
    sufficient,
)
for units, as_key, match_fact, comp_fact_df in res:
    logging.info(f'Found company fact matching {query_fact_substr}: as_key = {as_key}, match_fact = {match_fact}, units = {units}')
    logging.info(f'\n\t{comp_fact_df}') if print_all_comp_fact_df else None
if res: # If only one company fact required, select closest match by choosing shortest matching string
    res_units, res_as_key, res_match_fact, res_comp_fact_df = min(res, key=lambda x: len(x[2]))
    logging.info(f'Shortest company fact match:\n\t res_as_key = {res_as_key}, res_match_fact = {res_match_fact}, res_units = {res_units}')
    display(res_comp_fact_df) if display_short_comp_fact_df else None

#### Analyze presence of selected company fact across tickers_df

In [None]:
# Search company fact with refined query string 
query_fact_substr = ["DepreciationAndAmortisation", "DepreciationDepletionAndAmortization"] 
sufficient = True
print_short_comp_fact_df = False
num_tests = 100

len_counter = Counter()
for _ in range(num_tests):
    random_number = random.randint(1, 10000)
    curr_ticker = tickers_df.iloc[random_number]
    comp_str = f'ticker number: {random_number}; ticker: {curr_ticker["ticker"]}; company title: {curr_ticker["title"]}'

    # Request comp facts dictionary
    comp_facts = ealib.get_response_dict(ealib.companyfacts_url(curr_ticker["cik_str"]), req_header, mrps)
    if comp_facts is None:
            logging.warning(f'Failed request when attempting to retrieve company facts for {comp_str}')
            len_counter['failed reqs'] += 1

    # Extract desired company fact
    res = ealib.comp_facts_df(
        comp_facts,
        query_fact_substr, 
        sufficient,
    )
    # Record the number of matches
    len_counter[f'{len(res)}'] += 1

    for units, as_key, match_fact, comp_fact_df in res:
        logging.info(f'Found company fact matching {query_fact_substr}: as_key = {as_key}, match_fact = {match_fact}, units = {units} with {comp_str}')
    if res: # Now extract tuple with shortest match_fact
        res_units, res_as_key, res_match_fact, res_comp_fact_df = min(res, key=lambda x: len(x[2]))
        logging.info(f'Shortest match fact tuple selected:\n\t res_as_key = {res_as_key}, res_match_fact = {res_match_fact}, res_units = {res_units} for test with {comp_str}')
        logging.info(res_comp_fact_df) if print_short_comp_fact_df else None
        
len_counter


#### Graphing a Company Fact Over Time

##### Select Filings for Graph's Values

In [None]:
"""
    Run Interactive Company Fact Search first, establishing following variables
    res_units, res_as_key, res_match_fact, res_comp_fact_df
"""

# First check the filings on which graph will be based
filing_date_col = "end"
y_axis_col = "val"
max_days = 360

select_filings_df = ealib.filter_filings(res_comp_fact_df, filing_date_col=filing_date_col, form_col="form", query_forms=[""], max_days=max_days)
select_filings_df

##### Plot Graph

In [None]:
plt.figure(figsize=(10, 5))  # Set the figure size (optional)
plt.plot( select_filings_df[filing_date_col], select_filings_df[y_axis_col], marker='o')  # Line plot with markers
plt.title(f'{res_match_fact} for {comp_str}')  # Adding a title to the graph
plt.xlabel('Filing end date')  # Label for the x-axis
plt.ylabel(f'Reported Number of {res_match_fact}')  # Label for the y-axis
plt.grid(True)  # Enable grid for easier readability
plt.xticks(rotation=45)
plt.show()  # Display the plot

#### Calculating cash burn rate

In [None]:
""" 
Calculates average cash burn daily rate, according to all SEC filing data available over the specified period
"""
min_days = 0
max_days = 360
query_forms = [""]
filing_date_col = "filed"

res = ealib.comp_facts_df(
    comp_facts,
    ["NetCashProvidedByUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], 
    True
)
if res:
    res_units, res_as_key, res_match_fact, res_comp_fact_df = min(res, key=lambda x: len(x[2]))
    ocf_df_filt = ealib.filter_filings(res_comp_fact_df, filing_date_col=filing_date_col, form_col="form", query_forms=query_forms, max_days=max_days, min_days=min_days)
    display(f'Average daily OCF burn rate {ealib.ocf_average_daily_burn_rate(ocf_df_filt)}, with units {res_units} and company fact {res_match_fact}, under accounting standards {res_as_key}')


#### Converting between currencies

In [None]:
def forex_ticker(from_currency: str, to_currency: str) -> str:
    return f"{from_currency}{to_currency}=X"

In [None]:
from_currency = "USD"
to_currency = "USD"

ealib.yf_info(forex_ticker(from_currency, to_currency), "previousClose")

##### Mean exchange rate for a given period

In [None]:
# Mean exchange rate throughout the year
start_year: int = 2018
end_year: int   = 2023

from_currency: str = "ARS"
to_currency: str = "USD"

# Fetch historical data for the given period
data = yf.download(forex_ticker(from_currency, to_currency), start=f"{start_year}-01-01", end=f"{end_year}-12-31")

# Resample the data to yearly frequency and calculate the mean exchange rate for each year
yearly_data = data['Close'].resample('YE').mean()

# Convert to DataFrame
yearly_avg_df = yearly_data.reset_index()
yearly_avg_df.columns = ['Year', f'Average Exchange Rate ({from_currency}/{to_currency})']

# Display the DataFrame
yearly_avg_df

In [None]:
# Closing-of-year exchange rate
yearly_data = data['Close'].resample('YE').last()
yearly_avg_df = yearly_data.reset_index()
yearly_avg_df.columns = ['Year', f'Average Exchange Rate ({from_currency}/{to_currency})']
yearly_avg_df