In [3]:
import sys
try:
    sys.path.remove('/Users/dizzydwarfus/Dev/finance_dashboard')
    sys.path.remove('C:\\Users\\lianz\\Python\\finance_dashboard')
except ValueError as e:
    print(f'{e}')
finally:
    sys.path.append('/Users/dizzydwarfus/Dev/finance_dashboard')
    sys.path.append('C:\\Users\\lianz\\Python\\finance_dashboard')

# Built-in libraries
import os
import datetime as dt
import re
from typing import Union
from dataclasses import dataclass

# Third-party libraries
from tqdm import trange
from dotenv import load_dotenv
import pandas as pd
from bs4.element import Tag

# Internal imports
from utils.secscraper.sec_class import SECData, TickerData
from utils.secscraper._dataclasses import Facts, Context, LinkLabels
from utils.secscraper._mapping import STANDARD_NAME_MAPPING
from utils.secscraper._utils import reverse_standard_mapping, get_filing_facts, translate_labels_to_standard_names, clean_values_in_facts, clean_values_in_segment, segment_breakdown_levels, split_facts_into_start_instant, get_monthly_period
from utils.database._connector import SECDatabase
from utils._logger import MyLogger

load_dotenv()

True

# Initialize variables for testing

In [None]:
sec = SECData()
mongo = SECDatabase(os.getenv('mongodb_sec'))
ticker = TickerData(ticker='BLK')

file = ticker.filings.loc[ticker.filings['form'] == '10-K'].iloc[0]
accessionNumber = file.get('accessionNumber')
folder_url = file.get('folder_url')
file_url = file.get('file_url')
soup = ticker.get_file_data(file_url=file_url)
index_df = ticker.get_filing_folder_index(folder_url=folder_url)

# ticker.scrape_logger.info(
#     file.get('filingDate').strftime('%Y-%m-%d') + ': ' + folder_url)

# start_date = dt.datetime(2022, 1, 1) # after XBRL implementation

# query = {
#     'cik': ticker.cik,
#     'form': {'$in': ['10-K']},
#     'filingDate': {'$gte': start_date},
# }

# filings_to_scrape = [i for i in mongo.tickerfilings.find(query).sort('filingDate', 1)]


# Script to insert submission, filings, and facts for each filing into database

In [None]:
sec = SECData()
sic_dict = sec.get_sic_list()
mongo = SECDatabase(connection_string=os.getenv('mongodb_sec'))

failed_submissions = []
failed_filings = []
failed_facts = []

with trange(len(sec.cik_list['ticker'][:50]), desc='Instantiating ticker...',) as t:
    for item in t:
        ticker = sec.cik_list['ticker'].iloc[item] # Get ticker from cik_list
        t.set_postfix(ticker=ticker, cik=sec.cik_list['cik_str'].iloc[item])

        # Initialize and instantiate TickerData object
        try:
            symbol = TickerData(ticker=ticker)
            cik = symbol.cik # get cik of ticker
            symbol.submissions['lastUpdated'] = dt.datetime.now()
            symbol.submissions['office'] = mongo.sicdb.find_one({'_id': symbol.submissions['sic']})['Office']
            sec.scrape_logger.info(f'{t}')
            sec.scrape_logger.info(f'\nInstantiated {symbol}...')
        except Exception as e:
            sec.scrape_logger.info(f'{t}')
            sec.scrape_logger.error(f'Failed to instantiate {ticker} with cik {cik}...{e}')
            continue

        filings = symbol.submissions.pop('filings')
        # print(filings)
        # Insert submissions to TickerData collection
        inserted_submission = mongo.insert_submission(submission=symbol._submissions)
        if inserted_submission is not None:
            failed_submissions.append(inserted_submission)

        # Insert filings to TickerFilings collection
        inserted_filing = mongo.insert_filings(cik=cik, filings=filings)
        if inserted_filing is not None:
            failed_filings.append(inserted_filing)

        # # Insert facts to Facts collection
        # for doc in filings:
        #     doc['lastUpdated'] = dt.datetime.now()

        #     if doc['form'] == '10-Q' or doc['form'] == '10-K':
        #         try:
        #             facts = symbol.get_facts_for_each_filing(doc)
        #             inserted_facts = mongo.insert_facts(accession=doc['accessionNumber'], facts=facts)
        #             if inserted_facts is not None:
        #                 failed_facts.append(inserted_facts)
        #         except Exception as e:
        #             sec.scrape_logger.error(f'TickerData().get_facts_for_each_filing() function failed for {doc["accessionNumber"]}...{e}')
        #             failed_facts.append(doc['accessionNumber'])
            
        sec.scrape_logger.info(f'Successfully updated {ticker}({cik})...\n')

# Test getting labels from _lab.xml

In [None]:
file = ticker.filings.loc[ticker.filings['form'] == '10-K'].iloc[13]
accessionNumber = file.get('accessionNumber')
folder_url = file.get('folder_url')
file_url = file.get('file_url')
soup = ticker.get_file_data(file_url=file_url)
index_df = ticker.get_filing_folder_index(folder_url=folder_url)

labels = ticker.get_elements(folder_url=folder_url, index_df=index_df,
                                scrape_file_extension='_lab').query("`xlink:type` == 'resource'")
labels['xlink:role'] = labels['xlink:role'].str.split(
    '/').apply(lambda x: x[-1])
labels['xlink:labelOriginal'] = labels['xlink:label']
labels['xlink:label'] = labels['xlink:label']\
    .str.replace('(lab_)|(_en-US)', '', regex=True)\
        .str.split('_')\
            .apply(lambda x: ':'.join(x[:2]))\
    .str.lower()
labels['accessionNumber'] = accessionNumber

In [None]:
labels.loc[labels['xlink:labelOriginal'].str.contains('ProductOrServiceAxis')]

# Test get_filing_facts

In [None]:
sec = SECData()
mongo = SECDatabase(os.getenv('mongodb_sec'))
ticker = TickerData(ticker='MSFT')
forms = ['10-K']
start_year = 2023
end_year = 2023

filing_available = ticker.filings[(ticker.filings['form'].isin(forms)) & (
    ticker.filings['filingDate'].dt.year >= start_year) & (ticker.filings['filingDate'].dt.year <= end_year)]
filing_available = filing_available.to_dict('records')

In [None]:
all_labels, all_calc, all_defn, all_context, all_facts, all_metalinks, all_merged_facts, failed_folders = get_filing_facts(ticker=ticker, filings_to_scrape=filing_available, verbose=True)

In [None]:
# write all_labels, all_calc, all_defn to xlsx on different sheets
# with pd.ExcelWriter(f'././data/{ticker.ticker}_all_data.xlsx') as writer:
#     all_facts.to_excel(writer, sheet_name='facts', index=False)
#     all_context.to_excel(writer, sheet_name='context', index=False)
#     all_labels.to_excel(writer, sheet_name='labels', index=False)
#     all_merged_facts.to_excel(writer, sheet_name='merged_facts', index=False)
#     all_calc.to_excel(writer, sheet_name='calc', index=False)
#     all_defn.to_excel(writer, sheet_name='defn', index=False)
#     all_metalinks.to_excel(writer, sheet_name='metalinks', index=False)

In [None]:
final_df = clean_values_in_facts(all_merged_facts)
final_df = clean_values_in_segment(final_df, labels_df=all_labels)
final_df = get_monthly_period(final_df)
final_df = translate_labels_to_standard_names(final_df, standard_name_mapping=STANDARD_NAME_MAPPING)
# final_df, start_end, instant = split_facts_into_start_instant(final_df)

# Test Facts Search

In [None]:
@dataclass
class Facts:
    fact_tag: Tag

    @property
    def factName(self) -> Union[str, None]:
        """Get factName

        Returns:
            str: factName
        """
        return self.fact_tag.name

    @property
    def factId(self) -> Union[str, None]:
        """Get factId

        Returns:
            str: factId
        """
        return self.fact_tag.attrs.get('id')

    @property
    def contextRef(self) -> Union[str, None]:
        """Get contextRef

        Returns:
            str: contextRef
        """
        return self.fact_tag.attrs.get('contextref')

    @property
    def unitRef(self) -> Union[str, None]:
        """Get unitRef

        Returns:
            str: unitRef
        """
        return self.fact_tag.attrs.get('unitref')

    @property
    def decimals(self):
        """Get decimals

        Returns:
            str: decimals
        """
        return self.fact_tag.attrs.get('decimals')

    @property
    def factValue(self) -> Union[str, int, None]:
        """Get factValue

        Returns:
            str: factValue
        """
        return self.fact_tag.text

    def to_dict(self) -> dict:
        """Convert facts to dict

        Returns:
            dict: dict containing facts information
        """
        return dict(factName=self.factName, factId=self.factId, contextRef=self.contextRef, unitRef=self.unitRef, decimals=self.decimals, factValue=self.factValue)

    def __repr__(self):
        return f'Facts(factName={self.factName}, factId={self.factId}, contextRef={self.contextRef}, unitRef={self.unitRef}, decimals={self.decimals}, factValue={self.factValue})'

    def __repr_html__(self):
        return f"""
        <div style="border: 1px solid #ccc; padding: 10px; margin: 10px;">
            <h3>Facts</h3>
            <p><strong>factName:</strong> {self.factName}</p>
            <p><strong>factId:</strong> {self.factId}</p>
            <p><strong>contextRef:</strong> {self.contextRef}</p>
            <p><strong>unitRef:</strong> {self.unitRef}</p>
            <p><strong>decimals:</strong> {self.decimals}</p>
            <p><strong>factValue:</strong> {self.factValue}</p>
        </div>
        """

    def __str__(self):
        return f'''factName={self.factName}
factId={self.factId}
contextRef={self.contextRef}
unitRef={self.unitRef}
decimals={self.decimals}
factValue={self.factValue}'''


In [None]:
sec = SECData()
mongo = SECDatabase(os.getenv('mongodb_sec'))
ticker = TickerData(ticker='RTX')
forms = ['10-K']

file = ticker.filings.loc[ticker.filings['form'].isin(forms)].iloc[0]
accessionNumber = file.get('accessionNumber')
folder_url = file.get('folder_url')
file_url = file.get('file_url')
soup = ticker.get_file_data(file_url=file_url)
index_df = ticker.get_filing_folder_index(folder_url=folder_url)

In [None]:
facts_list = []
facts = ticker.search_facts(soup=soup)
for fact_tag in facts:
    print(fact_tag.prettify())
    print(fact_tag.text)
    # fact_tag = Facts(fact_tag=fact_tag)
    break
    # facts_list.append(Facts(fact_tag=fact_tag).to_dict())

# pd.DataFrame(facts_list)

# Test Context Search

In [None]:
@dataclass
class Context:
    context_tag: Tag
    entity_pattern: str = ".*identifier.*"
    startDate_pattern: str = ".*startdate.*"
    endDate_pattern: str = ".*enddate.*"
    instant_pattern: str = ".*instant.*"
    segment_pattern: str = ".*segment.*"
    segment_breakdown_pattern: str = ".*xbrldi:.*"

    @property
    def contextId(self) -> str:
        """Get contextId

        Returns:
            str: contextId
        """
        return self.context_tag.attrs.get('id')
    
    @property
    def entity(self) -> Union[str, None]:
        pattern = re.compile(self.entity_pattern)
        result = self.context_tag.find(pattern)
        return result.text if result is not None else None
    
    @property
    def startDate(self) -> str:
        return self.search_dates(self.startDate_pattern)
    
    @property
    def endDate(self) -> str:
        return self.search_dates(self.endDate_pattern)
    
    @property
    def instant(self) -> str:
        return self.search_dates(self.instant_pattern)
    
    @property
    def segment(self) -> Union[dict, None]:
        """Get segments and tags classifying the segment and store in dict

        Returns:
            dict: dict containing segment and tags classifying the segment
        """
        segment_pattern = re.compile(self.segment_pattern)
        segment_breakdown_pattern = re.compile(self.segment_breakdown_pattern)

        segment = self.context_tag.find(segment_pattern)

        if segment is None:
            return None

        segment_dict = {}

        segment_breakdown = segment.find_all(segment_breakdown_pattern)

        for i in segment_breakdown:
            segment_dict[i.attrs.get('dimension')] = i.text

        return segment_dict

    def search_dates(self, pattern: str) -> Union[str, None]:
        """Search for pattern in context tag

        Args:
            pattern (str): pattern to search for

        Returns:
            Union[str, None]: result of search
        """
        pattern = re.compile(pattern)
        result = self.context_tag.find(pattern)

        if result is None:
            return None
        
        result = result.text

        if result == '':
            return None
        
        return dt.datetime.strptime(result, '%Y-%m-%d')
    
    def to_dict(self) -> dict:
        """Convert context to dict

        Returns:
            dict: dict containing context information
        """
        context_dict = {
            'contextId': self.contextId,
            'entity': self.entity,
            'segment': self.segment,
            'startDate': self.startDate,
            'endDate': self.endDate,
            'instant': self.instant,
            'segmentLength': self.get_segment_length()

        }
        return context_dict

    def get_segment_length(self) -> int:
        """Get length of segment

        Returns:
            int: length of segment
        """
        segment = self.context_tag.find(re.compile(".*segment.*"))

        if segment is None:
            return 0

        return len(segment)

    def __repr__(self):
        return f'Context(contextId={self.contextId}, entity={self.entity}, segment={self.segment}, startDate={self.startDate}, endDate={self.endDate}, instant={self.instant})'

    def __repr_html__(self):
        return f"""
        <div style="border: 1px solid #ccc; padding: 10px; margin: 10px;">
            <h3>Context</h3>
            <p><strong>contextId:</strong> {self.contextId}</p>
            <p><strong>entity:</strong> {self.entity}</p>
            <p><strong>segment:</strong> {self.segment}</p>
            <p><strong>startDate:</strong> {self.startDate}</p>
            <p><strong>endDate:</strong> {self.endDate}</p>
            <p><strong>instant:</strong> {self.instant}</p>
        </div>
        """

    def __str__(self):
        return f'''contextId={self.contextId}
entity={self.entity}
segment={self.segment}
startDate={self.startDate}
endDate={self.endDate}
instant={self.instant}'''

In [None]:
sec = SECData()
mongo = SECDatabase(os.getenv('mongodb_sec'))
ticker = TickerData(ticker='RTX')

file = ticker.filings.loc[ticker.filings['form'] == '10-K'].iloc[0]
accessionNumber = file.get('accessionNumber')
folder_url = file.get('folder_url')
file_url = file.get('file_url')
soup = ticker.get_file_data(file_url=file_url)
index_df = ticker.get_filing_folder_index(folder_url=folder_url)

In [None]:
context_list = []
contexts = ticker.search_context(soup=soup)
for tag in contexts[:]:
    parsed_tag = Context(context_tag=tag)
    context_list.append(parsed_tag.to_dict())

context_df = pd.DataFrame(context_list).drop_duplicates(subset=['contextId'],keep='first')

# Test processing segment

In [None]:
def segment_breakdown_levels(final_df: pd.DataFrame) -> int:
    dict_len = 0
    for i in final_df['segment']:
        if isinstance(i, dict):
            curr_len = len(list(i.items()))
            if  curr_len > dict_len:
                dict_len = curr_len
                if curr_len > 1:
                    print(list(i.items()))

    return dict_len

segment_breakdown_levels(all_context)

In [None]:
merged_facts = all_merged_facts.copy()
labels_df = all_labels.copy()
from typing import Literal

def join_segments(x: dict, segment_type: Literal['key','value']) -> str:
    if x is not None and isinstance(x, dict):
        try:
            if segment_type == 'key':
                result = ", ".join(list(x.keys()))
            elif segment_type == 'value':
                result = ", ".join(list(x.values()))
            return result
        except Exception as e:
            print(f'Error: {e} on {x}')
    else:
        return None
    
labels_df = labels_df.query("`xlink:role` == 'label'")[['xlink:label', 'labelText']]\
    .set_index('xlink:label')\
        .to_dict()['labelText']

merged_facts['segment_modified'] = merged_facts['segment'].apply(lambda x: {labels_df.get(i.lower()): labels_df.get(j.lower()) for i, j in x.items()} if isinstance(x, dict) else None)

merged_facts['segmentAxis'] = merged_facts['segment_modified']\
    .apply(lambda x: join_segments(x, segment_type='key'))

merged_facts['segmentValue'] = merged_facts['segment_modified']\
    .apply(lambda x: join_segments(x, segment_type='value'))

merged_facts.drop(['segment', 'segment_modified'], axis=1, inplace=True)

# Test merging labels with labels from xbrl us-gaap xsd document

In [None]:
xbrl_us_gaap = 'http://xbrl.fasb.org/us-gaap/2024/elts/us-gaap-2024.xsd'
xbrl_srt = 'http://xbrl.fasb.org/srt/2024/elts/srt-std-2024.xsd'

In [None]:
labels = sec.us_gaap_tags
labels['id'] = labels['id'].str.split('_', n=1).str.join(':').str.lower()
merged_fact_with_label = all_facts.merge(labels, how='left', left_on='factName', right_on='id')
[i for i in merged_fact_with_label.loc[merged_fact_with_label['id'].isnull(),'factName']]

In [None]:
label_len_list = []
for label in labels['id']:
    label_len_dict = {}
    label_len_dict['label_name'] = label
    label_len_dict['label_len'] = len(label)
    label_len_list.append(label_len_dict)

pd.DataFrame(label_len_list).sort_values(by='label_len', ascending=False)

# Test Reverse Mapping

In [None]:
STANDARD_NAME_MAPPING = {
    'Revenue':
        [
            'Revenue from Contract with Customer, Excluding Assessed Tax',
            'Revenues',
            'Revenue, Net',
            'Sales Revenue Net',
            'Sales Revenue, Net',],
    'Cost of Goods or Services':
        [
            'Cost of Goods and Services Sold',
            'Cost Of Goods And Services Sold',
        ],
    'Gross Profit':
        [
            '',
        ],
    'Operating Expenses':
        [
            '',
        ],
    'Operating Income':
        [
            '',
        ],
    'Interest Expense':
        [
            '',
        ],
    'Income Before Tax':
        [
            '',
        ],
    'Income Tax Expense':
        [
            'Income Taxes Paid, Net',
            'Income Taxes Paid Net'
        ],
    'Net Income':
        [
            'Net Income (Loss)',
            'Net income',
            'Net sales',
            'Net Income Loss',
        ],
    'Cash and Cash Equivalents':
        [
            '',
        ],
    'Short Term Investments':
        [
            '',
        ],
    'Total Cash':
        [
            '',
        ],
    'Net Receivables':
        [
            '',
        ],
    'Inventory':
        [
            '',
        ],
    'Other Current Assets':
        [
            '',
        ],
    'Total Current Assets':
        [
            '',
        ],
    'Long Term Investments':
        [
            '',
        ],
    'Property Plant and Equipment':
        [
            '',
        ],
    'Goodwill':
        [
            '',
        ],
    'Intangible Assets':
        [
            '',
        ],
    'Other Assets':
        [
            '',
        ],
    'Total Assets':
        [
            '',
        ],
    'Accounts Payable':
        [
            '',
        ],
    'Short/Current Long Term Debt':
        [
            '',
        ],
    'Other Current Liabilities':
        [
            '',
        ],
    'Total Current Liabilities':
        [
            '',
        ],
    'Long Term Debt':
        [
            '',
        ],
    'Other Liabilities':
        [
            '',
        ],
    'Total Liabilities':
        [
            '',
        ],
    'Common Stock':
        [
            '',
        ],
    'Retained Earnings':
        [
            '',
        ],
    'Treasury Stock':
        [
            '',
        ],
    'Capital Surplus':
        [
            '',
        ],
    'Shareholder Equity':
        [
            '',
        ],
    'Net Tangible Assets':
        [
            '',
        ],
    'Total Stockholders Equity':
        [
            '',
        ],
    'Net Cash Flow':
        [
            '',
        ],
    'Net Cash Flow-Operating':
        [
            '',
        ],
    'Net Cash Flows-Investing':
        [
            '',
        ],
    'Net Cash Flows-Financing':
        [
            '',
        ],
    'Effect of Exchange Rate Changes':
        [
            '',
        ],
    'Net Change in Cash':
        [
            '',
        ],
    'Cash Interest Paid':
        [
            '',
        ],
    'Cash Taxes Paid':
        [
            '',
        ],
    'Depreciation and Amortization':
        [
            '',
        ],
    'Capital Expenditures':
        [
            '',
        ],
    'Change in Working Capital':
        [
            '',
        ],
    'Free Cash Flow':
        [
            '',
        ],
    'Free Cash Flow per Share':
        [
            '',
        ],
    'Operating Cash Flow per Share':
        [
            '',
        ],
    'Cash per Share':
        [
            '',
        ],
    'Book Value per Share':
        [
            '',
        ],
    'Tangible Book Value per Share':
        [
            '',
        ],
    'Shareholders Equity per Share':
        [
            '',
        ],
    'Interest Debt per Share':
        [
            '',
        ],
    'Market Cap':
        [
            '',
        ],
    'Enterprise Value':
        [
            '',
        ],
    'PE Ratio':
        [
            '',
        ],
    'Price to Sales Ratio':
        [
            '',
        ],
    'POCF Ratio':
        [
            '',
        ],
    'PFCF Ratio':
        [
            '',
        ],
    'PB Ratio':
        [
            '',
        ],
    'PTB Ratio':
        [
            '',
        ],
    'EV to Sales':
        [
            '',
        ],
    'Enterprise Value over EBITDA':
        [
            '',
        ],
    'EV to Operating cash flow':
        [
            '',
        ],
}

reversed_mapping = reverse_standard_mapping(standard_name_mapping=STANDARD_NAME_MAPPING)

translated_df = translate_labels_to_standard_names(merged_facts=all_merged_facts, standard_name_mapping=reversed_mapping)

translated_df

# Parse using GPT (test)

In [None]:
context = symbol.search_context(soup)[0]
data = {
    'id': context['id'],
    'entity': {
        'identifier': {
            'scheme': context.find('identifier')['scheme'],
            'value': context.find('identifier').text
        }
    },
    'period': {
        'startDate': context.find('startdate').text,
        'endDate': context.find('enddate').text
    }
}

In [None]:
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.output_parsers import XMLOutputParser
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessage,
    HumanMessagePromptTemplate,
)
from langchain.callbacks import get_openai_callback
import json

llm = ChatOpenAI(temperature=0)

parser = XMLOutputParser(tags=['id', 'entity', 'period'])
template = ChatPromptTemplate.from_messages(
    [
        SystemMessage(
            content=(
                "You are a helpful assistant that parses XML files for a company's financial statements from the SEC Edgar database."
                "The XML content will be provided by the user."
                "You will parse the output and return it in the json format."
                "{format_instructions}"
            )
        ),
        HumanMessagePromptTemplate.from_template("{xml}"),
    ]
)

context_list = []
total_cost = 0
total_tokens = 0

llm = ChatOpenAI()
with trange(len(contexts[:]), desc='Scraping contexts...') as t:
    for i in t:
        with get_openai_callback() as cb:
            t.set_postfix(context=contexts[i].attrs.get('id'))
            output = llm(template.format_messages(format_instructions=parser.get_format_instructions(), xml=contexts[i]))
            total_cost += cb.total_cost
            total_tokens += cb.total_tokens
            context_list.append(json.loads(output.content))


# Test Plots

In [None]:
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default = 'browser'
pio.renderers

In [None]:
metric_df = final_df
metric_df = metric_df.loc[\
    (metric_df['labelText'].str.contains('Sales Revenue |'))\
    & (metric_df['segmentAxis'] == 'Statement Business Segments Axis')\
].sort_values(by=['labelText', 'segmentAxis', 'segmentValue', 'startDate', 'endDate'])
print(metric_df['segmentValue'].unique())

metric_df = metric_df.loc[\
    (metric_df['segmentValue'] == 'Europe Member')\
]
metric_df = metric_df.drop_duplicates(subset=['labelText', 'segmentAxis', 'segmentValue', 'startDate', 'endDate'], keep='last',)
metric_df

In [None]:
final_df['labelText'].unique()

In [None]:
# Create a line plot
fig = px.line(metric_df, x='endDate', y='factValue',
                color='labelText', line_group='labelText',
                #   hover_data={'change': ':,'},
                )
# Overlay a scatter plot for the individual points
fig.add_trace(
    go.Scatter(
        x=metric_df['endDate'],
        y=metric_df['factValue'],
        mode='markers',
        # marker=dict(
        #     color=metric_df['color'].map(
        #         {'increase': 'green', 'decrease': 'red', 'neutral': 'grey'}),
        #     size=15,
        #     symbol=metric_df['color'].map(
        #         {'increase': 'triangle-up', 'decrease': 'triangle-down', 'neutral': 'circle'})
        # ),
        hoverinfo='skip',
        showlegend=False
    )
)
# for trace in fig.data:
#     print(trace)
# Customize the layout
fig.update_layout(
    title='Metrics over time',
    xaxis_title='End Date',
    yaxis_title='Value',
    legend_title='Segment Axis',
    font=dict(
        family='Courier New, monospace',
        size=18,
        color='RebeccaPurple'
    ),
    hovermode='x unified'
)

fig.update_xaxes(autorange=True)
fig.update_yaxes(autorange=True, rangemode="tozero")

# Test Insert Facts to TickerFilings Collection

Insert when AccessionNumber is the same
- DECIDE: only insert merged facts
- DECIDE: or insert raw facts, context, labels before merging

In [1]:
import sys
try:
    sys.path.remove('/Users/dizzydwarfus/Dev/finance_dashboard')
    sys.path.remove('C:\\Users\\lianz\\Python\\finance_dashboard')
except ValueError as e:
    print(f'{e}')
finally:
    sys.path.append('/Users/dizzydwarfus/Dev/finance_dashboard')
    sys.path.append('C:\\Users\\lianz\\Python\\finance_dashboard')

# Built-in libraries
import os
import datetime as dt
import re
from typing import Union
from dataclasses import dataclass

# Third-party libraries
from tqdm import trange
from dotenv import load_dotenv
import pandas as pd
from bs4.element import Tag

# Internal imports
from utils.secscraper.sec_class import SECData, TickerData
from utils.secscraper._dataclasses import Facts, Context, LinkLabels
from utils.secscraper._mapping import STANDARD_NAME_MAPPING
from utils.secscraper._utils import reverse_standard_mapping, get_filing_facts, translate_labels_to_standard_names, clean_values_in_facts, clean_values_in_segment, segment_breakdown_levels, split_facts_into_start_instant, get_monthly_period
from utils.database._connector import SECDatabase
from utils._logger import MyLogger

load_dotenv()

list.remove(x): x not in list


True

In [2]:
# sec = SECData()
mongo = SECDatabase(connection_string=os.getenv('mongodb_sec'))
# ticker = TickerData(ticker='AAPL')
# forms = ['10-K', '10-Q']
# start_year = 2009
# end_year = 2023
# filing_available = ticker.filings[(ticker.filings['form'].isin(forms)) & (
#     ticker.filings['filingDate'].dt.year >= start_year) & (ticker.filings['filingDate'].dt.year <= end_year)]
# filing_available = filing_available.to_dict('records')

accessionNumber = "0000320193-23-000077"
filing = mongo.tickerfilings.find_one({'accessionNumber': accessionNumber})
filing_available = [filing]
filing_available

[{'_id': ObjectId('652701c66814d04902654baf'),
  'accessionNumber': '0000320193-23-000077',
  'acceptanceDateTime': datetime.datetime(2023, 8, 3, 18, 4, 43),
  'act': '34',
  'cik': '0000320193',
  'fileNumber': '001-36743',
  'file_url': 'https://www.sec.gov/Archives/edgar/data/0000320193/000032019323000077/0000320193-23-000077.txt',
  'filingDate': datetime.datetime(2023, 8, 4, 0, 0),
  'filmNumber': '231141522',
  'folder_url': 'https://www.sec.gov/Archives/edgar/data/0000320193/000032019323000077',
  'form': '10-Q',
  'isInlineXBRL': 1,
  'isXBRL': 1,
  'items': '',
  'lastUpdated': datetime.datetime(2024, 1, 30, 23, 45, 37, 448000),
  'primaryDocDescription': '10-Q',
  'primaryDocument': 'aapl-20230701.htm',
  'reportDate': datetime.datetime(2023, 7, 1, 0, 0),
  'size': 5939898,
  'facts': [{'factName': 'us-gaap:revenueremainingperformanceobligationexpectedtimingofsatisfactionstartdateaxis.domain',
    'factId': None,
    'contextRef': None,
    'unitRef': None,
    'decimals': No

In [None]:
all_labels = pd.DataFrame()
all_calc = pd.DataFrame()
all_defn = pd.DataFrame()
all_context = pd.DataFrame()
all_facts = pd.DataFrame()
all_metalinks = pd.DataFrame()
all_merged_facts = pd.DataFrame()
failed_folders = []
fact_update_requests = []

for file in filing_available:
    if (file.get('form') != '10-Q' or file.get('form') != '10-K') and file.get('filingDate') < dt.datetime(2009, 1, 1):
        continue

    accessionNumber = file.get('accessionNumber')
    folder_url = file.get('folder_url')
    file_url = file.get('file_url')
    ticker.scrape_logger.info(
        file.get('filingDate').strftime('%Y-%m-%d') + ': ' + folder_url)

    soup = ticker.get_file_data(file_url=file_url)

    try:  # Scrape facts
        facts_list = []
        facts = ticker.search_facts(soup=soup)
        for fact_tag in facts:
            facts_list.append(Facts(fact_tag=fact_tag).to_dict())
        facts_df = pd.DataFrame(facts_list)
        # if facts_list != []:
        #     facts_df['accessionNumber'] = accessionNumber
        #     fact_update_requests.append(mongo.create_facts_update_request(accessionNumber=accessionNumber, facts=facts_list))
        
        facts_df['accessionNumber'] = accessionNumber
        all_facts = pd.concat([all_facts, facts_df], ignore_index=True)
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape facts for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape facts for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    if len(facts_list) == 0:
        ticker.scrape_logger.info(
            f'No facts found for {ticker.ticker}({ticker.cik})-{folder_url}...\n')
        continue

    try:  # Scrape context
        context_list = []
        contexts = ticker.search_context(soup=soup)
        for tag in contexts:
            context_list.append(Context(context_tag=tag).to_dict())
        context_df = pd.DataFrame(context_list)
        context_df['accessionNumber'] = accessionNumber
        all_context = pd.concat(
            [all_context, context_df], ignore_index=True)
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape context for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape context for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    index_df = ticker.get_filing_folder_index(folder_url=folder_url)

    try:  # Scrape metalinks
        metalinks = ticker.get_metalinks(
            folder_url=folder_url + '/MetaLinks.json')
        metalinks['accessionNumber'] = accessionNumber
        all_metalinks = pd.concat(
            [all_metalinks, metalinks], ignore_index=True)
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape metalinks for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape metalinks for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    try:  # Scrape labels
        labels = ticker.get_elements(folder_url=folder_url, index_df=index_df,
                                        scrape_file_extension='_lab').query("`xlink:type` == 'resource'")
        labels['xlink:role'] = labels['xlink:role'].str.split(
            '/').apply(lambda x: x[-1])
        labels['xlink:labelOriginal'] = labels['xlink:label']
        labels['xlink:label'] = labels['xlink:label']\
            .str.replace('(lab_)|(_en-US)', '', regex=True)\
                .str.split('_')\
                    .apply(lambda x: ':'.join(x[:2]))\
            .str.lower()
        labels['accessionNumber'] = accessionNumber
        all_labels = pd.concat([all_labels, labels], ignore_index=True)

    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape labels for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape labels for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    try:  # Scrape calculations
        calc = ticker.get_elements(folder_url=folder_url, index_df=index_df,
                                    scrape_file_extension='_cal').query("`xlink:type` == 'arc'")
        calc['accessionNumber'] = accessionNumber
        all_calc = pd.concat([all_calc, calc], ignore_index=True)
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape calc for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape calc for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    try:  # Scrape definitions
        defn = ticker.get_elements(folder_url=folder_url, index_df=index_df,
                                    scrape_file_extension='_def').query("`xlink:type` == 'arc'")
        defn['accessionNumber'] = accessionNumber
        all_defn = pd.concat([all_defn, defn], ignore_index=True)
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to scrape defn for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to scrape defn for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    ticker.scrape_logger.info(
        f'Merging facts with context and labels. Current facts length: {len(facts_list)}...')
    try:
        merged_facts = facts_df.merge(context_df, how='left', left_on='contextRef', right_on='contextId')\
            .merge(labels.query("`xlink:role` == 'label'"), how='left', left_on='factName', right_on='xlink:label')
        merged_facts = merged_facts.drop(
            ['accessionNumber_x', 'accessionNumber_y'], axis=1)

        ticker.scrape_logger.info(
            f'Successfully merged facts with context and labels. Merged facts length: {len(merged_facts)}...')
    except Exception as e:
        ticker.scrape_logger.error(
            f'Failed to merge facts with context and labels for {folder_url}...{e}')
        failed_folders.append(dict(folder_url=folder_url, accessionNumber=accessionNumber,
                                error=f'Failed to merge facts with context and labels for {folder_url}...{e}', filingDate=file.get('filingDate')))
        pass

    all_merged_facts = pd.concat(
        [all_merged_facts, merged_facts], ignore_index=True)
    
    ticker.scrape_logger.info(
        f'Successfully scraped {ticker.ticker}({ticker.cik})-{folder_url}...\n')

all_merged_facts = all_merged_facts.loc[~all_merged_facts['labelText'].isnull(), [
    'labelText', 'segment', 'startDate', 'endDate', 'instant', 'factValue', 'unitRef']]


In [None]:
mongo.tickerfilings.bulk_write(fact_update_requests)

In [None]:
now = dt.datetime.now().replace(minute=48)
updated_filings = mongo.tickerfilings.find({'lastUpdated': {"$gte": now}, 'form': {'$in': ['10-K']}}, sort=[('filingDate', -1)])
for filing in updated_filings:
    print(filing)

In [None]:
random_fact = mongo.tickerfilings.find_one({'accessionNumber': '0001193125-10-238044'})