### Process Portfolio Composition Data

In [None]:
import pandas as pd
import dask.dataframe as dd
pd.options.plotting.backend = 'plotly'
import numpy as np
pd.set_option('display.float_format', '{:.3f}'.format)

In [None]:
def preprocess_data(ddf): 
    def _map_data(pdf: pd.DataFrame) -> pd.DataFrame:
        pdf = pdf.copy()

        return pdf
    
    meta = pd.DataFrame({
        "total_advance_subscription": pd.Series(dtype="int64"),
        "nav": pd.Series(dtype="int64"),
        "total_units": pd.Series(dtype="int64"),
        "net_unit_change": pd.Series(dtype="int64"),
        "nav_per_unit": pd.Series(dtype="float64"),
        "creation_unit": pd.Series(dtype="int64"),
        "equity_value_basket": pd.Series(dtype="int64"),
        "cash_component": pd.Series(dtype="int64"),
        "fund_purchase": pd.Series(dtype="string"),
        "fund_redemption": pd.Series(dtype="string"),
        "Date": pd.Series(dtype="datetime64[ns]")
    })

    use_cols = [
        'total_advance_subscription', 'nav', 'total_units', 'net_unit_change',
        'nav_per_unit', 'creation_unit', 'equity_value_basket',
        'cash_component', 'fund_purchase', 'fund_redemption', 'Date'
    ]

    ddf = ddf.rename(
        columns={
            "requested_date": "Date"
        }
    )

    ddf = ddf[use_cols]
    ddf = ddf.sort_values(by='Date', ascending=True)
    return ddf.map_partitions(_map_data, meta=meta)

def get_cleaned_pcf_data(df): 
    ordered_cols = [
        'Date', 'total_units', 'nav', 'nav_per_unit', 'net_unit_change', 'creation_unit', 
        'total_advance_subscription', 'equity_value_basket',
        'cash_component', 'fund_purchase', 'fund_redemption'
    ]
    df = df.reset_index(drop=True)
    df['Date'] = pd.to_datetime(df['Date'])
    return df[ordered_cols]

In [None]:
ddf = dd.read_csv('./fubon_weight_data/portfolio_composition/*.csv')

In [None]:
preprocessed_ddf = preprocess_data(ddf)
df = preprocessed_ddf.compute()

df = get_cleaned_pcf_data(df)
df = df.set_index('Date')

df.reset_index().to_csv('/home/duc/Desktop/feature_engineer/fubon_weight_data/portfolio_composition/fubon_portfolio_composition.csv', index=False)

### Process Creation Basket Data

In [None]:
def preprocess_data(ddf): 
    ddf = ddf.rename(
        columns={
            'data_date': 'Date',
            'stock_code': 'stock',
            'total_units': 'total_outstanding_units',
            'net_asset_value': 'nav'
        }
    )

    use_cols = [ 
        "Date", "stock", "shares", 
        "market_value", "weight_pct", "total_market_value", "total_weight", "total_outstanding_units", 
        "nav", "nav_per_unit", 
        "usd_to_twd", "usd_to_vnd", "cash_twd", "cash_usd", "cash_vnd", "payables_twd" 
    ]

    return ddf[use_cols]

def get_cleaned_pcf_data(df): 
    df['Date'] = pd.to_datetime(df['Date'])
    df['stock'] = df.stock.str.split(' ', expand=True)[0]
    df = df.sort_values(by=['stock'], ascending=[True])
    
    df = df.reset_index(drop=True)
    df['Date'] = pd.to_datetime(df['Date'])
    return df

In [None]:
ddf = dd.read_csv('./fubon_weight_data/raw/*.csv')

ddf_preproc = preprocess_data(ddf)
df = ddf_preproc.compute()

df = get_cleaned_pcf_data(df)

pcf = pd.read_csv('/home/duc/Desktop/feature_engineer/fubon_weight_data/cleaned/fubon_portfolio_composition.csv', parse_dates=['Date'])
df = df.merge(pcf, on=['Date', 'nav', 'nav_per_unit'], how='left')

df['weight_pct'] = df['weight_pct'] / 100
df['allocated_value'] = df['weight_pct']*df['equity_value_basket']
df['price_per_share'] = df['market_value']/df['shares']
df['quantity'] = np.floor(df['allocated_value']/df['price_per_share'])
df['quantity'] = (df['quantity'] / 100).round() * 100
df['net_primary_lot'] = df['net_unit_change']/df['creation_unit']
df['primary_flow_vnd'] = df['net_primary_lot']*df['equity_value_basket']/df['usd_to_twd']*df['usd_to_vnd']

df.query('Date == "2025-09-18"')[['quantity']]

df.to_csv('/home/duc/Desktop/feature_engineer/fubon_weight_data/cleaned/fubon_creation_basket_all.csv', index=False)

tmp = pd.read_csv('fubon_weight_data/cleaned/fubon_creation_basket_all.csv')

tmp = tmp.rename(
    columns={
        'quantity': 'volume',
        'weight_pct': 'weight'
    }
)

use_cols = [
    'Date',
    'stock',
    'volume', 
    'weight'
]

tmp = tmp[use_cols]

# Shift report date up 1 date to match PCF transaction date
save_df = pd.DataFrame()
save_df = save_df.assign(
                Date=tmp['Date'],
                stock=tmp['stock'],
                volume=tmp.groupby('stock')['volume'].shift(1),
                weight=tmp.groupby('stock')['weight'].shift(1)
            )

(save_df.set_index(['Date', 'stock']).unstack(level=1)
.to_csv('fubon_weight_data/cleaned/shift_report_date/fubon_fund_weight.csv')
)

(pd.read_csv('/home/duc/Desktop/feature_engineer/fubon_weight_data/cleaned/shift_report_date/fubon_fund_weight.csv', header=[0, 1], index_col=0)
 .stack(level=1, future_stack=True)
 .reset_index()
 .to_csv('/home/duc/Desktop/feature_engineer/fubon_weight_data/cleaned/shift_report_date/fubon_fund_weight_long.csv',index=False))