In [2]:
import sys
import subprocess
import pkg_resources

def install_packages():
    required_packages = [
        "numpy",
        "pandas",
        "scikit-learn",
        "joblib",
        "pyarrow",
        "fastparquet",
        "plotly",
        "matplotlib"
    ]
    
    installed_packages = {pkg.key for pkg in pkg_resources.working_set}

    for package in required_packages:
        if package.lower() not in installed_packages:
            print(f"Installing {package}...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        else:
            print(f"{package} is already installed.")
    
    print("All packages are verified")

install_packages()

numpy is already installed.
pandas is already installed.
scikit-learn is already installed.
joblib is already installed.
pyarrow is already installed.
fastparquet is already installed.
plotly is already installed.
matplotlib is already installed.
All packages are verified


In [15]:
import pandas as pd
import os
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


In [None]:
input_dir = '..//data//assets//'
output_dir = '..//data//preprocess//'
table_dir='..//results//tables//'

In [32]:

def save_table(df: pd.DataFrame, 
               title: str = 'Table', 
               table_dir: str = '..results/tables/'):
    """
    Saves the DataFrame as a CSV file. If a file with the exact title exists,
    it will overwrite the existing file. Otherwise, it will create a new file
    with the next available number.

    Args:
        df (pd.DataFrame): DataFrame to be saved.
        title (str): Title to be used in the CSV filename.
        output_path (str): Path where the file will be saved.
    """
    os.makedirs(os.path.join(table_dir, 'csv'), exist_ok=True)
    csv_path = os.path.join(table_dir, 'csv')
    existing_files = [f for f in os.listdir(csv_path) if title in f and f.endswith('.csv')]
    if existing_files:
        file_name_csv = existing_files[0]
        csv_output_path = os.path.join(csv_path, file_name_csv)
    else:
        num = len([f for f in os.listdir(csv_path) if f.startswith('Tabela_')])
        num += 1
        file_name_csv = f"Tabela_{num}_{title}.csv"
        csv_output_path = os.path.join(csv_path, file_name_csv)

    df.to_csv(csv_output_path, index=False)
    print(f"Tabela saved as CSV: {csv_output_path}")
    
def check_data(input_dir):
    """
    Loads all .parquet files from the specified directory, processes them by:
    - Extracting the ticker symbol from the filename (everything before the first '_').
    - Creating a 'date' column with unique dates and dropping duplicates.
    - Storing the ticker, first date, last date, and the number of columns in the DataFrame for each processed file.

    Args:
        directory_path (str): The path to the directory containing the .parquet files.

    Returns:
        pd.DataFrame: A DataFrame containing the summary information (ticker, first date, last date, and shape).
    """
    parquet_files = [f for f in os.listdir(input_dir) if f.endswith('.parquet')]
    summary_data = [] 
    for parquet_file in parquet_files:
        ticker = parquet_file.split('_')[0] 
        file_path = os.path.join(input_dir, parquet_file)
        df = pd.read_parquet(file_path)
        df['date'] = df['time'].dt.strftime('%Y-%m-%d')
        df['time_of_trade'] = df['time'].dt.strftime('%H:%M:%S')
        first_date = df['date'].min()
        last_date = df['date'].max()
        first_trade =  df['time_of_trade'].min()
        last_trade = df['time_of_trade'].max()
        summary_data.append({
            'ticker': ticker,
            'first_date': first_date,
            'last_date': last_date,
            'fist_trade': first_trade,
            'last_trade': last_trade,
            'rows': df.shape[0],
            'columns': df.shape[1],
            'unique_dates': len(df.date.unique())
        })
    
    data_info_df = pd.DataFrame(summary_data)
    save_table(data_info_df, title='Visualizaçao das séries de dados escolhidas')
    return data_info_df

def elegant_inputer(df: pd.DataFrame, 
                    start_date: str = '2022-06-01 09:00:00', 
                    end_date: str = '2024-11-22 17:45:00',
                    timeframe: str = '15T', 
                    lookback: int = 180):
    """
    Generates missing time intervals for the provided date range and timeframe, 
    adds them to the original dataframe, and fills missing values with forward fill method.
    
    Parameters:
    df (DataFrame): Original dataframe containing the time and other columns.
    start_date (str): Start date and time of the range (default '2022-06-01 09:00:00').
    end_date (str): End date and time of the range (default '2024-11-22 17:45:00').
    timeframe (str): Time interval for generating timestamps (default '15T' for 15 minutes).
    lookback (int): The number of previous valid entries to use for filling missing data (default 5).
    
    Returns:
    DataFrame: A dataframe with the missing time intervals added and missing values filled.
    """
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    
    time_intervals = pd.date_range(start=start_date, end=end_date, freq=timeframe)
    df_aux = pd.DataFrame(time_intervals, columns=['time'])
    df_aux['date'] = df_aux['time'].dt.date
    df['date'] = df['time'].dt.date
    valid_dates = df['date'].unique()
    df_aux = df_aux[df_aux['date'].isin(valid_dates)]
    df_inputed = pd.merge(df_aux, df, on=['date', 'time'], how='left')
    df_inputed['tick_volume'].fillna(0, inplace=True)
    df_inputed['real_volume'].fillna(0, inplace=True)
    df_inputed = df_inputed.sort_values(by='time').reset_index(drop=True)
    cols_to_ffill = ['open','high',	'low',	'close', 'spread']
    df_inputed[cols_to_ffill] = df_inputed[cols_to_ffill].fillna(method='ffill',  limit=lookback)
    first_trade = start_date.time()
    last_trade = end_date.time()
    df_inputed = df_inputed[(df_inputed['time'].dt.time >= first_trade) & (df_inputed['time'].dt.time <= last_trade)]

    return df_inputed

def process_data(input_dir: str, 
                 output_dir: str):
    """
    Processes all .parquet files in the specified directory, concatenating DataFrames by aligning on the 'time' column
    and appending the ticker as a suffix to each column name.

    Args:
        input_dir (str): Path to the directory containing .parquet files.
        output_dir (str): Path to save the processed DataFrame.
    """
    parquet_files = [f for f in os.listdir(input_dir) if f.endswith('.parquet')]
    processed_df = pd.DataFrame()
    for parquet_file in parquet_files:
        ticker = parquet_file.split('_')[0]
        file_path = os.path.join(input_dir, parquet_file)
        df = pd.read_parquet(file_path)
        df = elegant_inputer(df)
        df.drop(columns='date', inplace=True)
        df = df.rename(columns=lambda col: f"{col}_{ticker}" if col != "time" else col)
        processed_df = pd.concat([processed_df, df], axis=1)

    processed_df = processed_df.loc[:, ~processed_df.columns.duplicated()]
    os.makedirs(output_dir, exist_ok=True)
    processed_df.to_parquet(f'{output_dir}/data.parquet')
    processed_df.to_csv(f'{output_dir}/data.csv', index=False)
    return processed_df

In [30]:
data = check_data(input_dir)
data



Tabela saved as CSV: ..results/tables/csv\Tabela_1_Visualizaçao das séries de dados escolhidas.csv


Unnamed: 0,ticker,first_date,last_date,fist_trade,last_trade,rows,columns,unique_dates
0,BGI$,2022-06-01,2024-11-22,09:00:00,17:45:00,17680,10,622
1,CCM$,2022-06-01,2024-11-22,09:00:00,17:45:00,18615,10,622
2,GOLD11,2022-06-01,2024-11-22,10:00:00,19:30:00,18122,10,622
3,IBOV,2022-06-01,2024-11-22,08:45:00,18:15:00,18128,10,622
4,ICF$,2022-06-01,2024-11-22,09:00:00,17:15:00,15490,10,622
5,IVVB11,2022-06-01,2024-11-22,10:00:00,19:30:00,18128,10,622


In [33]:
data = process_data(input_dir=input_dir, output_dir=output_dir)
data.head()


Unnamed: 0,time,open_BGI$,high_BGI$,low_BGI$,close_BGI$,tick_volume_BGI$,spread_BGI$,real_volume_BGI$,open_CCM$,high_CCM$,...,tick_volume_ICF$,spread_ICF$,real_volume_ICF$,open_IVVB11,high_IVVB11,low_IVVB11,close_IVVB11,tick_volume_IVVB11,spread_IVVB11,real_volume_IVVB11
0,2022-06-01 09:00:00,313.1,316.52,313.1,315.66,26.0,1.0,28.0,123.34,123.74,...,27.0,1.0,15.0,,,,,0.0,,0.0
1,2022-06-01 09:15:00,315.76,315.96,313.2,313.2,59.0,1.0,91.0,123.3,123.33,...,14.0,1.0,20.0,,,,,0.0,,0.0
2,2022-06-01 09:30:00,313.05,313.25,311.99,312.49,27.0,1.0,41.0,123.28,123.39,...,21.0,1.0,21.0,,,,,0.0,,0.0
3,2022-06-01 09:45:00,313.05,313.25,311.99,312.49,0.0,1.0,0.0,123.28,123.53,...,32.0,1.0,39.0,,,,,0.0,,0.0
4,2022-06-01 10:00:00,313.3,313.95,313.1,313.95,23.0,1.0,30.0,123.3,123.32,...,75.0,1.0,96.0,214.64,215.11,214.0,214.95,227.0,1.0,2113.0


In [8]:
data.isna().sum()

time                  0
open_BGI$             0
high_BGI$             0
low_BGI$              0
close_BGI$            0
tick_volume_BGI$      0
spread_BGI$           0
real_volume_BGI$      0
open_CCM$             0
high_CCM$             0
low_CCM$              0
close_CCM$            0
tick_volume_CCM$      0
spread_CCM$           0
real_volume_CCM$      0
open_GOLD11           4
high_GOLD11           4
low_GOLD11            4
close_GOLD11          4
tick_volume_GOLD11    0
spread_GOLD11         4
real_volume_GOLD11    0
open_IBOV             4
high_IBOV             4
low_IBOV              4
close_IBOV            4
tick_volume_IBOV      0
spread_IBOV           4
real_volume_IBOV      0
open_ICF$             0
high_ICF$             0
low_ICF$              0
close_ICF$            0
tick_volume_ICF$      0
spread_ICF$           0
real_volume_ICF$      0
open_IVVB11           4
high_IVVB11           4
low_IVVB11            4
close_IVVB11          4
tick_volume_IVVB11    0
spread_IVVB11   