In [56]:
import pandas as pd
import numpy as np
from euroclear.config import RAW_DATA_DIR, INTERIM_DATA_DIR

In [9]:
df_transactions = pd.read_parquet(f'{RAW_DATA_DIR}/transactions_small/')

In [4]:
df_world = pd.read_parquet(f'{RAW_DATA_DIR}/world_data/')

In [5]:
df_securities = pd.read_parquet(f'{RAW_DATA_DIR}/securities/')

In [6]:
df_daily = pd.read_parquet(f'{RAW_DATA_DIR}/daily_quotation')

In [7]:
df_forex = pd.read_parquet(f'{RAW_DATA_DIR}/forex_ex/')

In [8]:
df_accounts = pd.read_parquet(f'{RAW_DATA_DIR}/accounts/')

In [11]:
df_transactions.columns

Index(['receive_security', 'deliver_security', 'transaction_timestamp', 'isin',
       'unit_delivered', 'market_value', 'currency_code'],
      dtype='object')

In [26]:
def safe_div(a: pd.Series, b: pd.Series) -> pd.Series:
    return np.divide(a, b, out=np.full_like(a, np.nan, dtype=np.float64), where=(b!=0) & (~pd.isna(b)))

In [13]:
def normalize_date(s: pd.Series) -> pd.Series:
    return pd.to_datetime(s, errors='coerce').dt.normalize()

In [14]:
tx = (
    df_transactions.copy()
    .rename(columns={'currency_code': 'tx_currency_code'})
    .assign(
        transaction_date=normalize_date(df_transactions['transaction_timestamp']),
    )
)

In [17]:
df_securities.columns

Index(['isin', 'currency_code', 'instrument_type', 'issuer',
       'issuer_country_code'],
      dtype='object')

In [18]:
df_sec = (
    df_securities.copy()
    .rename(columns={'currency_code': 'security_currency_code'})
    .drop_duplicates(subset=['isin'])
)

In [19]:
tx = tx.merge(df_sec, on='isin', how='left', validate='m:1')

In [21]:
df_forex.columns

Index(['currency_code', 'equivalent_eur'], dtype='object')

In [22]:
df_fx = (
    df_forex.copy()
    .rename(columns={'equivalent_eur': 'tx_equivalent_eur'})
    .drop_duplicates(subset=['currency_code'])
)

In [23]:
tx = (
    tx.merge(df_fx, left_on='tx_currency_code', right_on='currency_code', how='left', validate='m:1')
    .drop(columns=['currency_code'])
)

In [24]:
tx.columns

Index(['receive_security', 'deliver_security', 'transaction_timestamp', 'isin',
       'unit_delivered', 'market_value', 'tx_currency_code',
       'transaction_date', 'security_currency_code', 'instrument_type',
       'issuer', 'issuer_country_code', 'tx_equivalent_eur'],
      dtype='object')

In [27]:
tx['market_value_eur'] = safe_div(tx['market_value'], tx['tx_equivalent_eur'])

In [31]:
df_dq = df_daily.copy()
df_dq['calendar_date'] = normalize_date(df_dq['calendar_date'])
df_dq = df_dq.drop_duplicates(subset=['isin', 'calendar_date'])

In [34]:
tx = (
    tx.merge(
        df_dq.rename(columns={'calendar_date': 'transaction_date'}),
        on=['isin', 'transaction_date'],
        how='left',
        validate='m:1'
    )
)

In [37]:
df_world.columns

Index(['country', 'country_code', 'currency_name', 'currency_code',
       'country_risk'],
      dtype='object')

In [40]:
df_country_issuer = df_world.rename(
    columns={'country': 'issuer_country', 'country_risk': 'issuer_country_risk', 'country_code': 'issuer_country_code'}
)[['issuer_country_code', 'issuer_country', 'issuer_country_risk']].drop_duplicates(subset='issuer_country_code')

In [41]:
tx = (
    tx.merge(
        df_country_issuer,
        on='issuer_country_code',
        how='left',
        validate='m:1'
    )
)

In [42]:
df_accounts.columns

Index(['account_id', 'company_name', 'address', 'country'], dtype='object')

In [43]:
df_acc = df_accounts.rename(columns={'country': 'account_country'}).copy()

In [44]:
df_receive = df_acc.rename(
    columns={
            "account_id": "receive_account_id",
            "company_name": "receive_company",
            "address": "receive_address",
            "account_country": "receive_country",
        }
)[['receive_account_id', 'receive_company', 'receive_address', 'receive_country']].drop_duplicates(subset='receive_account_id')

In [45]:
tx = tx.merge(
    df_receive,
    left_on='receive_security',
    right_on='receive_account_id',
    how='left',
    validate='m:1'
)

In [47]:
df_deliver = df_acc.rename(
    columns={
        "account_id": "deliver_account_id",
        "company_name": "deliver_company",
        "address": "deliver_address",
        "account_country": "deliver_country",
    }
)[
    ["deliver_account_id", "deliver_company", "deliver_address", "deliver_country"]
].drop_duplicates(subset=["deliver_account_id"])
tx = tx.merge(
    df_deliver,
    left_on="deliver_security",
    right_on="deliver_account_id",
    how="left",
    validate="m:1",
)

In [48]:
df_world.columns

Index(['country', 'country_code', 'currency_name', 'currency_code',
       'country_risk'],
      dtype='object')

In [50]:
tx = tx.merge(
    df_world.rename(
        columns={
            'country_code': 'receive_country_code',
            'country_risk': 'receive_country_risk',
            'country': 'receive_country'
        }
    )[['receive_country', 'receive_country_code', 'receive_country_risk']].drop_duplicates(subset='receive_country'),
    on='receive_country',
    how='left',
    validate='m:1'
)

In [51]:
tx = tx.merge(
    df_world.rename(
        columns={
            'country_code': 'deliver_country_code',
            'country_risk': 'deliver_country_risk',
            'country': 'deliver_country'
        }
    )[['deliver_country', 'deliver_country_code', 'deliver_country_risk']].drop_duplicates(subset='deliver_country'),
    on='deliver_country',
    how='left',
    validate='m:1'
)

In [52]:
preferred = [
        "transaction_timestamp", "transaction_date", "isin",
        "unit_delivered", "market_value", "market_value_eur",
        "tx_currency_code", "tx_equivalent_eur", "real_quotation",
        "security_currency_code", "instrument_type", "issuer",
        "issuer_country_code", "issuer_country", "issuer_country_risk",
        "receive_security", "receive_account_id", "receive_company",
        "receive_address", "receive_country", "receive_country_code", "receive_country_risk",
        "deliver_security", "deliver_account_id", "deliver_company",
        "deliver_address", "deliver_country", "deliver_country_code", "deliver_country_risk",
    ]

In [53]:
len(preferred)

29

In [54]:
tx.shape

(415517, 29)

In [57]:
cols = [c for c in preferred if c in tx.columns] + [c for c in tx.columns if c not in preferred]
tx.loc[:, cols].to_parquet(INTERIM_DATA_DIR / 'tx.parquet')

In [58]:
tx['unite_price_tx'] = safe_div(tx['market_value'], tx['unit_delivered'])

In [59]:
tx['unit_price_eur'] = safe_div(tx['market_value_eur'], tx['unit_delivered'])

In [60]:
tx['weekday'] = tx['transaction_date'].dt.dayofweek
tx['is_weekend'] = tx['weekday'] >= 5