In [1]:
from src.types import dsf_dtype_dict
import dask.dataframe as dd
import numpy as np
import pandas as pd
from tqdm import tqdm
from bokeh.plotting import output_notebook

output_notebook()
from dask.distributed import LocalCluster

cluster = LocalCluster()
client = cluster.get_client()

In [4]:
price_columns = ['DlyClose', 'DlyLow', 'DlyHigh', 'DlyBid', 'DlyAsk']
columns_to_keep = price_columns + ["PERMNO", "DlyCalDt", "DlyVol"]

In [5]:
data = dd.read_csv("../../data/dsf_v2_patched.csv/*.part",
                   dtype=dsf_dtype_dict,
                   parse_dates=['DlyCalDt'],
                   usecols=columns_to_keep
                   )

In [None]:
def norm(df_input, window=50):
    df = df_input[["PERMNO", "DlyCalDt", "DlyVol", "DlyClose"]].copy()
    group = df_input.groupby('PERMNO')
    df['Close'] = df['DlyClose']

    # Volume normalization with metadata
    df['DlyVol'] = group['DlyVol'].transform(
        lambda x: (np.log1p(x) - np.log1p(x).rolling(window=window, center=False).mean()) /
                  (np.log1p(x).rolling(window=window, center=False).std() + 1e-8),
        meta=('DlyVol', 'float64')
    )

    df['DlyVol'] = (df['DlyVol'] + 3) / 6

    # For log-returns
    for col in price_columns:
        # Calculate log returns directly into the dataframe
        df[col] = group[col].transform(
            lambda x: np.log(x / x.shift(1)),
            meta=(col, 'float64')
        )

        # Z-score normalization with rolling window
        df[f'{col}'] = group[col].transform(
            lambda x: (x - x.rolling(window=window, center=False).mean()) /
                      (x.rolling(window=window, center=False).std() + 1e-8),
            meta=(f'{col}', 'float64')
        )

        # Clip extreme values
        df[f'{col}'] = df[f'{col}'].clip(-3, 3)

        # Normalize to [0,1]
        df[f'{col}'] = (df[f'{col}'] + 3) / 6

    df.to_csv("./dsf_normalized_3_with_norm_data.csv")

norm(data, window=50)

In [5]:
data = dd.read_csv("./dsf_normalized_3_with_norm_data.csv/*.part",
                   dtype=dsf_dtype_dict,
                   parse_dates=['DlyCalDt'],
                   )

In [10]:
data.to_csv("./dsf_normalized_3_with_norm_single.csv", single_file=True)

['E:\\Work\\Projet Finance\\SynologyDrive\\src\\sentiment\\dsf_normalized_3_with_norm_single.csv']

In [5]:
data = pd.read_csv("./dsf_normalized_3_with_norm_single.csv",
                   dtype=dsf_dtype_dict,
                   parse_dates=['DlyCalDt'],
                   usecols=["PERMNO", "DlyCalDt", "DlyClose", "DlyLow", "DlyHigh", "DlyBid", "DlyAsk", "DlyVol",
                            "Close"],
                   )

In [7]:
window_size = 50

def is_window_valid(window_data, nan_counts):
    return not any(nan_counts[start:start + window_size].any() for start in range(len(window_data) - window_size + 1))

def find_valid_windows(df):
    if len(df) < window_size:
        return []

    nan_mask = df[price_columns + ['DlyVol']].isna()

    close_prices = df['Close'].to_numpy()
    df_len = len(df)

    future_max = np.full(df_len, np.nan)
    valid_range = slice(window_size + 15, window_size + 25)

    for i in range(df_len - window_size - 15):
        future_slice = close_prices[i + valid_range.start:i + valid_range.stop]
        if len(future_slice) > 0:
            future_max[i] = np.max(future_slice)

    price_increases = future_max < close_prices
    potential_starts = np.where(price_increases)[0]

    valid_indices = []
    i = 0
    while i < len(potential_starts):
        start_idx = potential_starts[i]
        if start_idx > df_len - window_size:
            break

        window_data = df.iloc[start_idx:start_idx + window_size]
        if not nan_mask.iloc[start_idx:start_idx + window_size].values.any():
            valid_indices.append(df.index[start_idx])
            i = np.searchsorted(potential_starts, start_idx + window_size - 1, side='right')
        else:
            i += 1

    return valid_indices


def build_df(df_input):
    permnos = df_input['PERMNO'].unique()
    estimated_size = len(df_input) // window_size
    results = [None] * estimated_size
    result_idx = 0

    grouped = df_input.groupby('PERMNO')

    for permno in tqdm(permnos, desc="Traitement des PERMNO"):
        group_df = grouped.get_group(permno)
        valid_indices = find_valid_windows(group_df)

        if valid_indices:
            valid_starts = group_df.index.get_indexer(valid_indices)
            group_array = group_df.to_numpy()

            for start_idx in valid_starts:
                if result_idx >= len(results):
                    results.extend([None] * (estimated_size // 2))
                results[result_idx] = pd.DataFrame(
                    group_array[start_idx:start_idx + window_size],
                    index=group_df.index[start_idx:start_idx + window_size],
                    columns=group_df.columns
                )
                result_idx += 1

    # Créer le DataFrame final
    final_df = pd.concat(results[:result_idx], axis=0)
    final_df['window_id'] = np.arange(len(final_df)) // window_size + 1
    final_df = final_df.drop(["Close"], axis=1)
    final_df.to_csv("./final_stocks_negative.csv", index=False)

In [None]:
build_df(data)