## BitM - ETL

In [1]:
!pip install pandas==2.2.3
!pip install returns

Defaulting to user installation because normal site-packages is not writeable
Collecting pandas==2.2.3
  Downloading pandas-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
Collecting numpy>=1.26.0 (from pandas==2.2.3)
  Downloading numpy-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Collecting pytz>=2020.1 (from pandas==2.2.3)
  Downloading pytz-2025.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas==2.2.3)
  Downloading tzdata-2025.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.7/12.7 MB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading numpy-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.1/16.1 MB[0m [31m11.9 MB/s

In [2]:
from returns.io import IO, IOSuccess, IOFailure, IOResult

In [3]:
def alpha(number: int) -> IOResult[int, int]:
    return IOSuccess(number) if number<10 else IOFailure(number)


In [4]:
result = list(map(alpha, [1,2,3,11,12,13]))

In [6]:
for item in result:
    print(type(item))

<class 'returns.io.IOSuccess'>
<class 'returns.io.IOSuccess'>
<class 'returns.io.IOSuccess'>
<class 'returns.io.IOFailure'>
<class 'returns.io.IOFailure'>
<class 'returns.io.IOFailure'>


In [14]:
epsilom = IOSuccess('aa')

In [None]:
epsilom

In [13]:
qq  = epsilom.unwrap

In [12]:
print(type(qq))

<class 'method'>


In [16]:
import pandas as pd
import os
import re
import numpy as np

In [17]:
# Historical Path
hist_path = 'data/hist/'

In [18]:
# Daily Path 
daily_path = 'data/daily/'

In [19]:
def get_files(path):
    files = os.listdir(path)
    # Filter only CSV files
    return [file for file in files if file.endswith('.csv')]

In [20]:
def process_files_in_dir(path, cols, fn):
    dfTmp = []
    files = get_files(path)
    for file in files:
        filepath = os.path.join(path, file)
        df = pd.read_csv(filepath, sep=';', names=cols, header=1, usecols=range(len(cols)))
        dfTmp.append(fn(file, df))
    
    return pd.concat(dfTmp, ignore_index=True) if files else None

### Historical Processing

In [21]:
column_names = ['Fecha', 'Nemotécnico', 'Precio cierre']

In [22]:
rename_cols = lambda df : df.rename(columns={'Fecha': 'Date', 'Precio cierre': 'Close', 'Nemotécnico':'Stock'})

In [23]:
cast_close = lambda df : df['Close'].astype(str).str.replace(',', '').replace('-', '0').astype(float)

In [24]:
# Cast date column
cast_date = lambda df : pd.to_datetime(df['Date'])

In [25]:
def process_hist(file, df):
    print(f'processing file {file}')
    df = rename_cols(df)    
    df['Close'] = cast_close(df)
    df['Date'] = cast_date(df)
    df.insert(1, 'src', 'hist')
    return df

In [26]:
dfHist = process_files_in_dir(hist_path, column_names, process_hist)

processing file BCOLOMBIA_20240613_055318.csv
processing file BOGOTA_20240613_114646.csv
processing file CELSIA_20240614_101420.csv
processing file CEMARGOS_20240613_114831.csv
processing file CORFICOLCF_20240614_101233.csv
processing file ECOPETROL_20240702_101105.csv
processing file GEB_20240613_055444.csv
processing file GRUBOLIVAR_20240614_101320.csv
processing file GRUPOARGOS_20240613_114334.csv
processing file GRUPOSURA_20240613_114143.csv
processing file ISA_20240613_055521.csv
processing file NUTRESA_20240613_115147.csv
processing file PFAVAL_20240613_114405.csv
processing file PFBCOLOM_20240613_055405.csv
processing file PFCORFICOL_20240614_101155.csv
processing file PFDAVVNDA_20240613_114741.csv
processing file PFGRUPOARG_20240613_114232.csv
processing file PFGRUPSURA_20240613_055607.csv
processing file PROMIGAS_20240614_101116.csv


In [27]:
dfHist.to_parquet('data.parquet')

### Daily Processing

In [28]:
column_names = ['Nemotécnico', 'Último precio']

In [29]:
rename_cols = lambda df : df.rename(columns={'Último precio': 'Close', 'Nemotécnico':'Stock'})

In [30]:
cast_close_daily = lambda df : df['Close'].astype(str).str.replace(',', '.').replace('-', '0').astype(float)

In [31]:
def add_date(file_name):
    
    # Regular expression pattern
    pattern = r'RVLocal_(\d{4})(\d{2})(\d{2})\.csv'
    
    # Search for the pattern in the string
    match = re.search(pattern, file_name)
    
    # Extract the matched groups
    if match:
        year = match.group(1)
        month = match.group(2)
        day = match.group(3)
        return f"{year}-{month}-{day}"

    return '1900-01-01'    

In [32]:
def process_daily(filename, df):

    df = rename_cols(df)
    df['Close'] = cast_close_daily(df)
    # Insert the new column at the beginning
    df.insert(0, 'Date', add_date(filename))
    df['Date'] = cast_date(df)
    df.insert(1, 'src', 'daily')
    return df

In [33]:
dfDaily = process_files_in_dir(daily_path, column_names, process_daily)

In [34]:
dfStockTemp = pd.concat([dfHist, dfDaily], ignore_index=True) \
                .sort_values(by=['Stock', 'Date', 'src'], ascending=[True, True, False]) \
                .drop_duplicates(subset=['Stock', 'Date'], keep='first')

In [35]:
dfStockTemp.tail()

Unnamed: 0,Date,src,Stock,Close
2431,2024-06-26,daily,TERPEL,0.0
2455,2024-06-27,daily,TERPEL,9010.0
2498,2024-06-28,daily,TERPEL,9000.0
2525,2024-07-02,daily,TERPEL,9010.0
2368,2024-06-24,daily,VALSIMESA,0.0
