In [1]:
import pandas as pd
import numpy as np
import os

import datetime
from logging import getLogger
HOME = "/home/dimitri/epita/big_data/project/bourse/data/"
logger = getLogger(__name__)

In [None]:
import os
import pandas as pd
import datetime
import re
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

# Regex optimisée pour extraire date + heure
_date_re = re.compile(r'(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}\.\d+)')

def extract_date_hours(path):
    file_name = path.split('/')[-1]
    match = _date_re.search(file_name)
    if not match:
        return None

    date_str = match.group(1)
    time_str = match.group(2)
    return datetime.datetime.strptime(f"{date_str} {time_str}", '%Y-%m-%d %H:%M:%S.%f')


def extract_symbole(df):
    df["boursorama"] = df["symbol"]
    df["symbol"] = df["symbol"].str[3:]

def extract_identifiant_companies(df):
    df["prefix"] = df["boursorama"].str[:3]
def list_all_file():
    list_path = []
    for dir_date in os.listdir(HOME + "boursorama"):
        for file_name in os.listdir(HOME + "boursorama/" + dir_date):
            full_path = os.path.join(HOME, "boursorama", dir_date, file_name)
            list_path.append(full_path)
    return list_path

def get_df(list_path, start = 0, end = 10000):
    df_list = []
    for path in list_path:
        df_tmp = pd.read_pickle(path)
        df_tmp['date'] = extract_date_hours(path)
        df_list.append(df_tmp)
        start += 1
        if start >= end:
            break
    df = pd.concat(df_list, ignore_index=True)
    extract_symbole(df)
    extract_identifiant_companies(df)
    logger.info("All files loaded")
    return df


In [None]:
import pandas as pd
import glob
import os
import re

def load_dataset(data_path, n):
    # Print the number of files in the directory
    files = os.listdir(data_path)
    print(f"Number of files in the directory: {len(files)}")

    # Use glob to get all files that start with 'Euronext_Equities_' and end with .csv or .xlsx
    file_pattern = os.path.join(data_path, "Euronext_Equities_*.*")
    files = glob.glob(file_pattern)

    # List to store individual dataframes
    df_list = []
    counter = 0

    # Loop through each file and process based on file extension
    for file in files:
        if counter == n:
            break
        try:
            if file.lower().endswith('.csv'):
                df = pd.read_csv(file, encoding='utf-8', sep='\t')
            elif file.lower().endswith('.xlsx'):
                df = pd.read_excel(file)
            else:
                # Skip unknown file types
                print("Unsupported file type")
                continue

            # Extract the date from the filename using a regex (assuming format YYYY-MM-DD)
            date_match = re.search(r'(\d{4}-\d{2}-\d{2})', os.path.basename(file))
            if date_match:
                file_date = date_match.group(1)
                # Add the second and milisecond to the date
                file_date = file_date + " 00:00:00.000000"
                file_date = datetime.datetime.strptime(file_date, '%Y-%m-%d %H:%M:%S.%f')
                # Add the date as a new column (as datetime type)
                df['date'] = file_date
            else:
                # Optionally log or handle files without a proper date in the filename
                df['file_date'] = pd.NaT

            # Append the dataframe to the list
            df_list.append(df)
            counter += 1
        except Exception as e:
            print(f"Error processing file {file}: {e}")

    return df_list

def get_df_euronext(n):
    # Define the path to the directory containing the files
    data_path = HOME + "euronext"

    df_list = load_dataset(data_path, n)

    # Concatenate all dataframes into one robust dataframe
    if df_list:
        combined_df = pd.concat(df_list, ignore_index=True)
    else:
        combined_df = pd.DataFrame()

    # Standardize column names: trim whitespace, lower case, and replace spaces with underscores
    combined_df.columns = combined_df.columns.str.strip().str.lower().str.replace(' ', '_')

    # Define a mapping of equivalent column names to merge differences between CSV and XLSX files
    column_synonyms = {
        'ticker': ['ticker', 'symbol'],
        'name': ['company', 'company_name', 'name'],
        'price': ['price', 'closing_price', 'last_price', 'last'],
        'currency': ['currency', 'trading_currency'],
        'open': ['open', 'open_price'],
        'high': ['high', 'high_price'],
        'low': ['low', 'low_price'],
        'last_trade_time': ['last_trade_time', 'last_trade_mic_time', 'last_date/time']
    }

    # Merge equivalent columns
    for canonical, synonyms in column_synonyms.items():
        # Find which of the synonym columns are present in the dataframe
        cols_present = [col for col in synonyms if col in combined_df.columns]
        if len(cols_present) > 1:
            # Merge the columns: use the first non-null value among the columns
            combined_df[canonical] = combined_df[cols_present[0]].combine_first(combined_df[cols_present[1]])
            for col in cols_present[2:]:
                combined_df[canonical] = combined_df[canonical].combine_first(combined_df[col])
            # Drop the extra synonym columns, keeping the canonical one
            for col in cols_present:
                if col != canonical:
                    combined_df.drop(columns=col, inplace=True)
        elif len(cols_present) == 1 and cols_present[0] != canonical:
            # Rename the column to the canonical name
            combined_df.rename(columns={cols_present[0]: canonical}, inplace=True)

    # Remove the closing price column if it exists
    if 'closing_price_datetime' in combined_df.columns:
        combined_df.drop(columns='closing_price_datetime', inplace=True)

    # Optionally, drop duplicates
    combined_df.drop_duplicates(inplace=True)

    # Remove the rows where the isin value is NaN
    combined_df = combined_df[~combined_df['isin'].isna()]

    # Add a new column 'pea' based on the currency column
    if 'currency' in combined_df.columns:
        combined_df['pea'] = combined_df['currency'].apply(
            lambda x: True if isinstance(x, str) and x.upper() == 'EUR' else False)
    else:
        combined_df['pea'] = False

    # Reset the index of the dataframe
    combined_df.reset_index(drop=True, inplace=True)

    #   Replace invalid values with NaN
    combined_df["high"] = combined_df["high"].replace('-', np.nan)
    combined_df["low"] = combined_df["low"].replace('-', np.nan)

    # Convert columns to float
    combined_df["high"] = combined_df["high"].astype(float)
    combined_df["low"] = combined_df["low"].astype(float)
    
    # Trier les données par 'isin' et 'date' pour garantir l'ordre correct
    if 'isin' in combined_df.columns and 'date' in combined_df.columns:
        combined_df.sort_values(by=['isin', 'date'], inplace=True)

    # Ajouter une colonne 'close' qui correspond à la valeur 'open' du jour suivant pour chaque 'isin'
    if 'open' in combined_df.columns:
        combined_df['close'] = combined_df.groupby('isin')['open'].shift(-1)
    else:
        combined_df['close'] = pd.NA

    # Réinitialiser l'index après avoir ajouté la colonne 'close'
    combined_df.reset_index(drop=True, inplace=True)

    return combined_df


In [4]:
dir = os.listdir(HOME + "boursorama")
i = 0
for dir_date in dir :
    list_file_path = os.listdir(HOME + "boursorama/" + dir_date)
    print(len(list_file_path))

44893
24931
25887
28177
28056
15054


In [5]:
list_path = list_all_file()
len(list_path)

166998

In [7]:
df_boursorama = get_df(list_path, 0, 5000)
df_boursorama.head(10)

Unnamed: 0,last,volume,symbol,name,date,boursorama,prefix
0,58.01,0,ABBV,ABBVIE,2019-02-28 13:32:01.512834,1rPABBV,1rP
1,37.29,405421,AC,ACCOR,2019-02-28 13:32:01.512834,1rPAC,1rP
2,46.02,0,ACNV,ACCOR,2019-02-28 13:32:01.512834,1rPACNV,1rP
3,171.7,22674,ADP,ADP,2019-02-28 13:32:01.512834,1rPADP,1rP
4,10.86,3512926,AF,AIR FRANCE - KLM,2019-02-28 13:32:01.512834,1rPAF,1rP
5,109.65,163127,AI,AIR LIQUIDE,2019-02-28 13:32:01.512834,1rPAI,1rP
6,113.24,285724,AIR,AIRBUS,2019-02-28 13:32:01.512834,1rPAIR,1rP
7,54.8,2613,AKA,AKKA TECHNOLOGIES,2019-02-28 13:32:01.512834,1rPAKA,1rP
8,3.46,0,ALUNV,ALCATEL I15,2019-02-28 13:32:01.512834,1rPALUNV,1rP
9,3.5,0,ALU,ALCATEL-LUCENT,2019-02-28 13:32:01.512834,1rPALU,1rP


In [None]:
len(df_boursorama)

299669

In [8]:
df_euronext = get_df_euronext(999)
df_euronext.head()

Number of files in the directory: 775


Unnamed: 0,name,isin,ticker,market,open,high,low,time_zone,volume,turnover,date,currency,price,last_trade_time,pea,close
0,SCHLUMBERGER,AN8068571086,SLB,Euronext Paris,16.9,17.5,15.45,CET,158448,2609873.0,2020-05-01,EUR,15.6,30/04/20 17:35,True,15.5
1,SCHLUMBERGER,AN8068571086,SLB,Euronext Paris,15.5,15.9,13.85,CET,167582,2413145.1,2020-05-04,EUR,14.4,04/05/20 17:35,True,15.0
2,SCHLUMBERGER,AN8068571086,SLB,Euronext Paris,15.0,15.9,15.0,CET,123769,1908206.3,2020-05-05,EUR,15.5,05/05/20 17:38,True,15.3
3,SCHLUMBERGER,AN8068571086,SLB,Euronext Paris,15.3,15.5,14.7,CET,66775,1003210.2,2020-05-06,EUR,15.25,06/05/20 17:35,True,15.1
4,SCHLUMBERGER,AN8068571086,SLB,Euronext Paris,15.1,15.55,14.8,CET,46711,717229.05,2020-05-07,EUR,15.45,07/05/20 17:35,True,16.25


In [21]:
def merge_dataset(df_boursorama, df_euronext, delete_name_alone=True):
    # delete_name_alone : delete the rows when the name is only in bourso and not in euronext
    if delete_name_alone:
        # Filter rows where the name exists in both datasets
        df_boursorama2 = df_boursorama[df_boursorama['name'].isin(df_euronext['name'])].copy()
    else:
        df_boursorama2 = df_boursorama.copy()

    # Get all unique columns from both datasets
    all_columns = list(set(df_euronext.columns).union(set(df_boursorama2.columns)))

    # Ensure both dataframes have the same columns
    for col in all_columns:
        if col not in df_euronext.columns:
            df_euronext[col] = np.nan
        if col not in df_boursorama2.columns:
            df_boursorama2[col] = np.nan

    # Concatenate the two datasets
    df = pd.concat([df_boursorama2, df_euronext], ignore_index=True)

    # Fill missing 'isin' values in df with the mapping from df_euronext
    isin_mapping = df_euronext.set_index('name')['isin'].to_dict()
    df['isin'] = df['isin'].fillna(df['name'].map(isin_mapping))

    # Remove duplicates based on all columns
    

    # Optionally, you can remove duplicates based on specific columns (e.g., 'name' and 'isin')
    # df = df.drop_duplicates(subset=['name', 'isin'])

    # TODO: Fill the other variables with the euronext values
    # TODO: Fill the 'symbol' value in the euronext with the bourso value

    return df

In [22]:
df = merge_dataset(df_boursorama, df_euronext, delete_name_alone=True)
df.head()

Unnamed: 0,last,volume,symbol,name,date,boursorama,prefix,close,market,price,time_zone,open,last_trade_time,high,currency,pea,low,turnover,isin,ticker
0,37.29,405421,AC,ACCOR,2019-02-28 13:32:01.512834,1rPAC,1rP,,,,,,,,,,,,FR0000120404,
1,46.02,0,ACNV,ACCOR,2019-02-28 13:32:01.512834,1rPACNV,1rP,,,,,,,,,,,,FR0000120404,
2,171.7,22674,ADP,ADP,2019-02-28 13:32:01.512834,1rPADP,1rP,,,,,,,,,,,,FR0010340141,
3,109.65,163127,AI,AIR LIQUIDE,2019-02-28 13:32:01.512834,1rPAI,1rP,,,,,,,,,,,,FR0000120073,
4,113.24,285724,AIR,AIRBUS,2019-02-28 13:32:01.512834,1rPAIR,1rP,,,,,,,,,,,,NL0000235190,


| **Table**     | **Colonne**   | **Type**           | **Description** |
|--------------|-------------|------------------|--------------|
| **companies** | id        | SMALLINT (PK)   | Identifiant unique de l'entreprise |
|             | name        | VARCHAR         | Nom de l'entreprise |
|             | mid        | SMALLINT        | Identifiant du marché (référence vers `markets.id`) |
|             | symbol     | VARCHAR         | Symbole boursier de l'entreprise |
|             | isin       | CHAR(12)        | Code ISIN (International Securities Identification Number) |
|             | boursorama | VARCHAR         | Identifiant de l'entreprise sur Boursorama |
|             | euronext   | VARCHAR         | Identifiant de l'entreprise sur Euronext |
|             | pea       | BOOLEAN         | Indique si l'action est éligible au Plan d'Épargne en Actions (PEA) |
|             | sector1   | VARCHAR         | Secteur principal de l'entreprise |
|             | sector2   | VARCHAR         | Secteur secondaire de l'entreprise |
|             | sector3   | VARCHAR         | Secteur tertiaire de l'entreprise |


In [81]:
from time import time

In [36]:
import numpy as np
import pandas as pd

def clean_numeric_column_fast(series: pd.Series) -> pd.Series:
    # Convertir en tableau de chaînes une fois pour toutes
    arr = np.char.asarray(series.astype(str).values)

    # Remplacer "," par "." et supprimer les espaces
    arr = np.char.replace(arr, ',', '.')
    arr = np.char.replace(arr, ' ', '')

    # Mettre NaN pour les simples tirets
    mask_dash = arr == '-'
    arr[mask_dash] = 'nan'

    # Convertir en float de façon vectorisée
    return pd.to_numeric(arr, errors='coerce')

In [27]:
from timescaledb_model import initial_markets_data

def populate_markets():
    # Convert initial_markets_data to a DataFrame for easier manipulation
    initial_data = pd.DataFrame(
        initial_markets_data,
        columns=["id", "name", "alias", "boursorama", "euronext", "sws"]
    )

    # Create the markets DataFrame
    df_markets = pd.DataFrame()
    df_markets["id"] = initial_data["id"]
    df_markets["name"] = initial_data["name"]
    df_markets["alias"] = initial_data["alias"]

    # Map boursorama prefixes to the corresponding markets
    df_markets["boursorama"] = initial_data["boursorama"]

    # Map euronext tickers to the corresponding markets
    df_markets["euronext"] = np.nan

    # Fill the "sws" column with data from initial_markets_data
    df_markets["sws"] = initial_data["sws"]


    return df_markets

Voici un tableau explicatif des colonnes de chaque table :  

| **Table**     | **Colonne**   | **Type**           | **Description** |
|--------------|-------------|------------------|--------------|
| **markets** | id          | SMALLINT (PK)   | Identifiant unique du marché |
|             | name        | VARCHAR         | Nom du marché |
|             | alias       | VARCHAR         | Alias du marché |
|             | boursorama  | VARCHAR         | Préfixe du marché sur Boursorama |
|             | sws        | VARCHAR         | Nom du marché sur Simply Wall Street |
|             | euronext    | VARCHAR         | Nom du marché sur Euronext |

In [28]:
def populate_companies(df, df_markets):
    df_companies = pd.DataFrame()

    df_companies["isin"] = df["isin"].values
    df_companies["name"] = df["name"].values

    # Merge df with df_markets on the 'prefix' and 'boursorama' columns, with explicit suffixes
    merged_df = df.merge(
        df_markets[['boursorama', 'id', 'name']],
        how='left',
        left_on='prefix',
        right_on='boursorama',
        suffixes=('_action', '_market')  # Explicit suffixes to avoid conflicts
    )

    # Extract the 'id' and 'name_market' columns from the merged dataframe
    df_companies["mid"] = merged_df["id"].fillna(-1).astype(int)  # Ensure 'mid' is an integer
    df_companies["symbol"] = df["symbol"].values
    df_companies["boursorama"] = df["boursorama"].values
    df_companies["id"] = np.arange(len(df_companies))

    df_companies["euronext"] = df["ticker"].values
    eligible_pea = ["Bourse de Milan", "Mercados Espanoles", "Amsterdam", "Paris", "Deutsche Borse", "Bruxelle"]

    # Use the 'name_market' column to determine eligibility for PEA
    df_companies["pea"] = merged_df["name_market"].apply(lambda name: name in eligible_pea if pd.notna(name) else False)

    df_companies["sector1"] = ""  # TODO
    df_companies["sector2"] = ""  # TODO
    df_companies["sector3"] = ""  # TODO

    # Drop duplicates based on all columns except 'id'
    df_companies = df_companies.loc[:, ~df_companies.columns.isin(['id'])].drop_duplicates().reset_index(drop=True)

    # Reassign unique IDs after dropping duplicates
    df_companies["id"] = np.arange(len(df_companies))

    return df_companies

Voici un tableau explicatif des colonnes de chaque table :  

| **Table**     | **Colonne**   | **Type**           | **Description** |
|--------------|-------------|------------------|--------------|
| **stocks**  | date      | TIMESTAMPTZ     | Date et heure de la cotation |
|             | cid       | SMALLINT        | Identifiant de l'entreprise (référence vers `companies.id`) |
|             | value     | FLOAT4          | Valeur de l'action à cet instant |
|             | volume    | FLOAT4          | Volume échangé à cet instant |


In [44]:
def populate_stocks(df_boursorama: pd.DataFrame, df_companies: pd.DataFrame, save_path: str = "daystocks.parquet"):
    df_stocks = pd.DataFrame()

    # Reset index to avoid ambiguity with 'symbol', without adding the old index as a column
    df_boursorama = df_boursorama.reset_index(drop=True)
    df_companies = df_companies.reset_index(drop=True)

    # Merge df_boursorama with df_companies on the 'symbol' column
    merged_df = df_boursorama.merge(
        df_companies[['symbol', 'id']],
        how='left',
        left_on='symbol',
        right_on='symbol',
        suffixes=('', '_company')  # Avoid suffix conflicts
    )

    # Populate the stocks dataframe
    df_stocks["date"] = merged_df["date"]
    df_stocks["cid"] = merged_df["id"].fillna(-1).astype(int)  # Ensure 'cid' is an integer
    df_stocks["value"] = clean_numeric_column_fast(merged_df["last"])
    df_stocks["volume"] = merged_df["volume"]


    # 🔐 Sauvegarde ultra rapide
    df_stocks.to_parquet(save_path, index=False)
    print(f"DataFrame sauvegardé dans {save_path}")

    return df_stocks


In [40]:
import pandas as pd
from time import time

def populate_daystocks(df_euronext: pd.DataFrame, df_companies: pd.DataFrame):
    # Merge des deux DataFrames
    merged_df = df_euronext.merge(
        df_companies[['isin', 'id']],
        how='left',
        on='isin'
    )

    # Initialisation du df final
    df_daystocks = pd.DataFrame()
    df_daystocks["date"] = merged_df["date"]
    df_daystocks["cid"] = merged_df["id"].fillna(-1).astype(int)

    # Colonnes à nettoyer
    numeric_cols = ["open", "close", "high", "low", "volume"]
    tps = time()
    df_daystocks[numeric_cols] = merged_df[numeric_cols].apply(clean_numeric_column_fast)
    print("Time to clean numeric columns:", time() - tps)

    # Calculs dérivés (mean et std après clean)
    df_daystocks["mean"] = (df_daystocks["high"] + df_daystocks["low"]) / 2
    df_daystocks["std"] = df_daystocks["high"] - df_daystocks["low"]


    return df_daystocks


In [31]:
df_markets = populate_markets()
df_markets.head(1)

Unnamed: 0,id,name,alias,boursorama,euronext,sws
0,1,New York,nyse,,,


In [32]:
df_companies = populate_companies(df, df_markets)
df_companies.head(1)

Unnamed: 0,isin,name,mid,symbol,boursorama,euronext,pea,sector1,sector2,sector3,id
0,FR0000120404,ACCOR,6,AC,1rPAC,,True,,,,0


In [41]:
df_daystocks = populate_daystocks(df_euronext, df_companies)
df_daystocks.head()

Time to clean numeric columns: 6.07834267616272


Unnamed: 0,date,cid,open,close,high,low,volume,mean,std
0,2020-05-01,108,16.9,15.5,17.5,15.45,158448.0,16.475,2.05
1,2020-05-01,272,16.9,15.5,17.5,15.45,158448.0,16.475,2.05
2,2020-05-04,108,15.5,15.0,15.9,13.85,167582.0,14.875,2.05
3,2020-05-04,272,15.5,15.0,15.9,13.85,167582.0,14.875,2.05
4,2020-05-05,108,15.0,15.3,15.9,15.0,123769.0,15.45,0.9


In [45]:
df_stocks = populate_stocks(df_boursorama, df_companies)
df_stocks.head(10)

DataFrame sauvegardé dans daystocks.parquet


Unnamed: 0,date,cid,value,volume
0,2019-02-28 13:32:01.512834,-1,58.01,0
1,2019-02-28 13:32:01.512834,0,37.29,405421
2,2019-02-28 13:32:01.512834,1,46.02,0
3,2019-02-28 13:32:01.512834,2,171.7,22674
4,2019-02-28 13:32:01.512834,-1,10.86,3512926
5,2019-02-28 13:32:01.512834,3,109.65,163127
6,2019-02-28 13:32:01.512834,4,113.24,285724
7,2019-02-28 13:32:01.512834,5,54.8,2613
8,2019-02-28 13:32:01.512834,-1,3.46,0
9,2019-02-28 13:32:01.512834,-1,3.5,0


In [43]:
df_daystocks = pd.read_parquet("daystocks.parquet")
df_daystocks.head(10)

Unnamed: 0,date,cid,value,volume
0,2019-02-28 13:32:01.512834,-1,58.01,0
1,2019-02-28 13:32:01.512834,0,37.29,405421
2,2019-02-28 13:32:01.512834,1,46.02,0
3,2019-02-28 13:32:01.512834,2,171.7,22674
4,2019-02-28 13:32:01.512834,-1,10.86,3512926
5,2019-02-28 13:32:01.512834,3,109.65,163127
6,2019-02-28 13:32:01.512834,4,113.24,285724
7,2019-02-28 13:32:01.512834,5,54.8,2613
8,2019-02-28 13:32:01.512834,-1,3.46,0
9,2019-02-28 13:32:01.512834,-1,3.5,0


In [97]:
df_companies.head(3)

Unnamed: 0,isin,name,mid,symbol,boursorama,id,euronext,pea,sector1,sector2,sector3
0,FR0000120404,ACCOR,6,AC,1rPAC,0,,True,,,
1,FR0000120404,ACCOR,6,ACNV,1rPACNV,1,,True,,,
2,FR0010340141,ADP,6,ADP,1rPADP,2,,True,,,
