# Setup
Configuration, Logger, Counter and Downloader

In [3]:
import os
import logging
import pathlib

import numpy as np
import pandas as pd

from core.config import Config, split_in_chunks
from data.download import LSEGDataDownloader

os.environ["RD_LIB_CONFIG_PATH"] = "/Configuration"

config = Config()
logging.basicConfig(
        filename=config.log_file,
        encoding="utf-8",
        level=config.log_level,
        format='%(asctime)s %(levelname)-8s %(message)s',
        datefmt = '%Y-%m-%d %H:%M:%S'
    )
logger = logging.getLogger()

RAW_DATA_PATH: pathlib.Path = config.full_dir_raw_data
STATIC_DATA_PATH: pathlib.Path = config.filtered_dir_static
HISTORIC_DATA_PATH: pathlib.Path = config.full_dir_historic
# In config under post_init change the path to which features are getting loaded

## Downloading time series Data

In [4]:
from core.exceptions import DataValidationError, DataDownloadError

with (LSEGDataDownloader(config) as downloader):
    for i, company_chunk in enumerate(config.companies_historic_chunks):
        for attempt in range(100):
            try:
                print(f"Downloading historic data: for {company_chunk[0]} to {company_chunk[-1]}")
                standardized_histordict: dict[str, pd.DataFrame] = {}
                if len(config.historic_features) >= 1000:
                    standardized_histordict = downloader.download_historic_in_chunks(
                        companies=company_chunk,
                        features=config.historic_chunks,
                        raw_data_dir=RAW_DATA_PATH,
                    )
                else:
                    standardized_histordict = downloader.download_historic_from(
                        companies=company_chunk,
                        features=config.historic_features,
                        raw_data_dir=RAW_DATA_PATH,
                        iteration= i
                    )
                for key, new_df in standardized_histordict.items():
                    new_df.to_csv(HISTORIC_DATA_PATH / f"company-{key}.csv")
                break
            except DataValidationError as e:
                company_chunk.remove(e.__getcompanies__())
                with open(config.removed_companies_file, "a") as file:
                    file.write(f"\n{e.__getcompanies__()}")
                print(f"Removed {e.__getcompanies__()} Try again.")
                continue
            except Exception as e:
                print(f"Error downloading historic data: {company_chunk}")
        else:
            raise DataDownloadError(
                "Too many attempts to download historic data",
                company_chunk,
                len(config.historic_features)
            )

KeyboardInterrupt: 

## Downloading static Data

In [None]:
from core.exceptions import DataValidationError, DataDownloadError

#skipped [1,3,4]
with LSEGDataDownloader(config) as downloader:
    for i, company_chunk in enumerate(config.companies_static_chunks):
        for attempt in range(100):
            try:
                print(f"Downloading static data: for {company_chunk[0]} to {company_chunk[-1]}")
                statdict: dict[str, pd.DataFrame] = downloader.download_static_from(
                    companies= company_chunk,
                    features= config.static_features,
                    raw_data_dir= RAW_DATA_PATH)
                for name, frame in statdict.items():
                    frame.to_csv(STATIC_DATA_PATH / f"company-{name}.csv")
                break
            except DataValidationError as e:
                company_chunk.remove(e.__getcompanies__())
                with open(config.removed_companies_file, "a") as file:
                    file.write(f"\n{e.__getcompanies__()}")
                print(f"Removed {e.__getcompanies__()} Try again.")
                continue
        else:
            raise DataDownloadError("Too many attempts to download static data", company_chunk)

# Other

In [None]:
import re

dir1 = config.dataset_dir / "historic"
dir2 = config.dataset_dir / "static"
files1: list[str] = [file.name for file in dir1.glob("*.csv")]
names1: list[str] = [re.findall(r"company-+(.*).csv", file)[0] for file in files1]
files2: list[str] = [file.name for file in dir2.glob("*.csv")]
names2: list[str] = [re.findall(r"company-+(.*).csv", file)[0] for file in files2]
not_in1: list[str] = [name for name in names2 if name not in names1]
not_in2: list[str] = [name for name in names1 if name not in names2]
not_in_both: list[str] = not_in1 + not_in2

In [None]:
def is_unique(s: pd.DataFrame):
    a: np.ndarray = s.to_numpy()
    return (a[0] == a).all()
without_same_results: pd.DataFrame = all_static_frame[all_static_frame.apply(is_unique, axis=1)]