In [18]:
import yaml
import json
import pandas as pd
import requests
from sqlalchemy import create_engine
from typing import Dict, List

# Загрузка конфигурации из YAML файла
with open("etl_config.yaml") as stream:
    try:
        etl_config = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

# Создание соединения с базой данных
constring_main = etl_config['source']['load']['type']['Database']['test']['url']
engine = create_engine(constring_main)

# Функция для преобразования ответа SEC в DataFrame
def sec_response_to_df(sec_response):
    df = pd.DataFrame.from_dict(sec_response.json(), orient='index')
    return df

# Функция для парсинга данных
def parcing_facts(data_dict: Dict) -> List:
    data_list = [
        {**entry, 'unit': unit_type, 'table': table}
        for table, value in data_dict.items()
        for unit_type, entries in value['units'].items()
        for entry in entries
    ]
    return data_list

# Функция для извлечения и обработки данных по списку CIK
def process_ciks(cik_list: List[str]) -> pd.DataFrame:
    headers = {'User-Agent': "email@address.com"}
    all_data = []

    for cik in cik_list:
        try:
            # Получение данных из SEC API
            companyFacts = requests.get(
                f'https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json',
                headers=headers
            )
            facts = companyFacts.json().get('facts', {})
            
            # Проверка наличия данных в 'us-gaap'
            if 'us-gaap' not in facts:
                print(f"No 'us-gaap' data for CIK {cik}")
                continue

            # Парсинг данных и создание DataFrame
            data_dict = facts['us-gaap']
            parsed_data = parcing_facts(data_dict)
            df = pd.DataFrame(parsed_data)

            # Преобразование колонок в datetime
            for col in ['start', 'end', 'filed']:
                if col in df.columns:
                    df[col] = pd.to_datetime(df[col], errors='coerce')

            # Добавление колонки с CIK
            df['cik'] = cik

            # Добавление данных в общий список
            all_data.append(df)

        except Exception as e:
            print(f"Error processing CIK {cik}: {e}")
            continue

    # Объединение всех данных в один DataFrame
    if all_data:
        result_df = pd.concat(all_data, ignore_index=True)
    else:
        result_df = pd.DataFrame()  # Пустой DataFrame, если нет данных
    return result_df

# Пример использования
cik_list = ['0001045810', '0000320193']  # Список CIK
final_df = process_ciks(cik_list)
print(final_df)


           start        end           val                  accn    fy  fp  \
0     2016-05-02 2016-07-31  9.000000e+06  0001045810-16-000300  2016  Q2   
1     2016-02-01 2016-10-30  9.000000e+06  0001045810-16-000353  2016  Q3   
2     2015-01-26 2015-10-25  2.163000e+01  0001045810-15-000173  2015  Q3   
3     2016-02-01 2016-07-31  4.206000e+01  0001045810-16-000300  2016  Q2   
4     2016-02-01 2016-10-30  4.206000e+01  0001045810-16-000353  2016  Q3   
...          ...        ...           ...                   ...   ...  ..   
45584 2023-10-01 2023-12-30  6.687800e+07  0000320193-24-000006  2024  Q1   
45585 2023-10-01 2024-03-30  6.286500e+07  0000320193-24-000069  2024  Q2   
45586 2023-12-31 2024-03-30  5.885300e+07  0000320193-24-000069  2024  Q2   
45587        NaT 2022-09-24  3.905300e+10  0000320193-23-000106  2023  FY   
45588        NaT 2023-09-30  4.690600e+10  0000320193-23-000106  2023  FY   

       form      filed      frame        unit  \
0      10-Q 2016-08-23   C

In [19]:
from core.etl import tickers
tickers_extract = tickers.extract()
tickers_raw_data= sec_response_to_df(tickers_extract)
tickers_transformed_data = tickers.transform(tickers_raw_data)
list_ciks = tickers_transformed_data.cik_str.head(500).to_list()

In [20]:
final_df

Unnamed: 0,start,end,val,accn,fy,fp,form,filed,frame,unit,table,cik
0,2016-05-02,2016-07-31,9.000000e+06,0001045810-16-000300,2016,Q2,10-Q,2016-08-23,CY2016Q2,USD,AcceleratedShareRepurchaseProgramAdjustment,0001045810
1,2016-02-01,2016-10-30,9.000000e+06,0001045810-16-000353,2016,Q3,10-Q,2016-11-22,,USD,AcceleratedShareRepurchaseProgramAdjustment,0001045810
2,2015-01-26,2015-10-25,2.163000e+01,0001045810-15-000173,2015,Q3,10-Q,2015-11-18,,USD/shares,AcceleratedShareRepurchasesFinalPricePaidPerShare,0001045810
3,2016-02-01,2016-07-31,4.206000e+01,0001045810-16-000300,2016,Q2,10-Q,2016-08-23,,USD/shares,AcceleratedShareRepurchasesFinalPricePaidPerShare,0001045810
4,2016-02-01,2016-10-30,4.206000e+01,0001045810-16-000353,2016,Q3,10-Q,2016-11-22,,USD/shares,AcceleratedShareRepurchasesFinalPricePaidPerShare,0001045810
...,...,...,...,...,...,...,...,...,...,...,...,...
45584,2023-10-01,2023-12-30,6.687800e+07,0000320193-24-000006,2024,Q1,10-Q,2024-02-02,CY2023Q4,shares,IncrementalCommonSharesAttributableToShareBase...,0000320193
45585,2023-10-01,2024-03-30,6.286500e+07,0000320193-24-000069,2024,Q2,10-Q,2024-05-03,,shares,IncrementalCommonSharesAttributableToShareBase...,0000320193
45586,2023-12-31,2024-03-30,5.885300e+07,0000320193-24-000069,2024,Q2,10-Q,2024-05-03,CY2024Q1,shares,IncrementalCommonSharesAttributableToShareBase...,0000320193
45587,NaT,2022-09-24,3.905300e+10,0000320193-23-000106,2023,FY,10-K,2023-11-03,CY2022Q3I,USD,OtherAssetsMiscellaneousNoncurrent,0000320193


In [21]:
all = process_ciks(list_ciks)

KeyboardInterrupt: 

In [None]:
all.shape


(8003042, 12)

In [None]:
all.to_sql('raw_tickers', con=engine, if_exists='replace', index=False)

42

In [None]:
sample_ = all.query('cik in @cik_list')