In [1]:
import pandas as pd
import numpy as np

In [2]:
def cleanCrosswalk(cw_df: pd.DataFrame) -> pd.DataFrame:
    """ Cleans a crosswalk DataFrame by ensuring that both asset IDs are unique,
    sorting the columns, sorting by the asset name, and resetting the index.

    Args:
        cw_df: A pandas DataFrame representing the crosswalk.

    Returns:
        A pandas DataFrame with the cleaned crosswalk.
    """
    # Ensure asset ids for cmc is unique
    assert cw_df['asset_cm'].is_unique, "asset_cm must be unique"
    assert cw_df['asset_coinapi'].is_unique, "asset_coinapi must be unique"

    # Rename
    cw_df = cw_df.rename(columns={'asset_coinapi': 'asset_ca'})

    # Sort columns
    sorted_cw_df = cw_df[['asset_cm', 'asset_ca']]

    # Sort by cm asset name and reset index
    cleaned_cw_df = sorted_cw_df.sort_values(by='asset_cm', ignore_index=True)

    return cleaned_cw_df

In [3]:
def cleanPanel(df: pd.DataFrame, cw_df: pd.DataFrame) -> pd.DataFrame:
    """ Clean the coinapi panel.
    
    Args:
        df (pd.DataFrame): raw panel data.
        cw_df (pd.DataFrame): cleaned cm to a crosswalk.
    
    Returns:
        (pd.DataFrame): cleaned panel data.
    """
    # date col
    assert 0 == df.date.isnull().sum()
    assert type(df.date.values[0]) == np.datetime64, "date is not the correct type."
    assert df.shape[0] == (df.date.dt.minute == 0).sum(), "all datetimes must be top of the hour"

    # cut down to relevant dates
    df = df[(df['date'] >= '2016-07-01') & (df['date'] <= '2023-01-02')]

    # asset col
    df = df.rename(columns={'asset_id': 'asset_ca'})
    assert 0 == df.asset_ca.isnull().sum()
    asset_ids = list(cw_df.asset_ca.values)
    df = df[df.asset_ca.isin(asset_ids)]
    assert df.shape[0] == df[df.asset_ca.isin(asset_ids)].shape[0]

    # drop duplicated datetime and asset
    df = df.drop_duplicates(subset=['date', 'asset_ca'])

    # fill missing ref price with market price
    df.loc[df.usd_per_token_ref.isnull(), 'usd_per_token_ref'] = df.usd_per_token_coinapi

    # for price and volume columns, ensure in range, no missing, and convert type down
    thresholds = {'usd_per_token_coinapi': 1e9, 'usd_volume_coinapi': 1e12, 'trades_coinapi': 1e9, 'usd_per_token_ref': 1e9}
    for col, thresh in thresholds.items():
        df.loc[df[col]>thresh, col] = np.nan
        df[col] = df[col].ffill()
        assert 0 == df[col].isnull().sum()
        assert 0 == df[(df[col]<0) | (df[col] > thresh)].shape[0]
        df[col] = df[col].astype('float32')

    # Loop over all assets to add any missing datetimes and fill columns
    final_df = pd.DataFrame()
    assets = df['asset_ca'].unique()
    for asset in assets:    
        # subset to asset of interest
        asset_df = df[df.asset_ca == asset].copy()
        asset_df.set_index('date', inplace=True)

        # find the min and max datetime for the asset
        min_dt, max_dt = asset_df.index.min(), asset_df.index.max()

        # create a complete DateTimeIndex with hourly frequency between min and max datetime
        full_date_range = pd.date_range(start=min_dt, end=max_dt, freq='1H')

        # Reindex asset_df with the complete DateTimeIndex
        asset_df = asset_df.reindex(full_date_range)

        # Fill asset_ca column for newly added rows
        asset_df['asset_ca'].fillna(asset, inplace=True)
        
        # Forward fill gaps that are less than 31 days
        asset_df = asset_df.ffill(limit=31*24)

        # Drop rows if missing all cols as this was a bigger gap than 31 days
        asset_df = asset_df.dropna(subset=['usd_per_token_coinapi', 'usd_volume_coinapi', 'trades_coinapi', 'usd_per_token_ref'])

        # Reset index 
        asset_df.reset_index(inplace=True)
        asset_df.rename(columns={'index': 'date'}, inplace=True)

        # Add data to master df
        final_df = pd.concat((final_df, asset_df))

    # Reset index of the final DataFrame
    final_df.reset_index(drop=True, inplace=True)

    # Reset  names
    df = final_df.copy()
    del final_df

    # Confirm no missings in the df
    assert(df.isnull().sum().sum() == 0)

    # Reset column names
    df = df.rename(columns={'usd_per_token_coinapi': 'usd_per_token_ca',
                            'usd_volume_coinapi': 'usd_volume_ca',
                            'trades_coinapi': 'trades_volume_ca',
                            'usd_per_token_ref': 'usd_ref_price_ca'})

    # ensure no duplicates by date and asset
    assert not df.duplicated(subset=['date', 'asset_ca']).any()

    # Sort by date then asset and reset index
    df = df.sort_values(by=['date', 'asset_ca']).reset_index(drop=True)

    return df

In [4]:
def cleanMacro(m_df: pd.DataFrame) -> pd.DataFrame:
    """ Cleans and preprocesses a macro data DataFrame.
    This function checks the integrity of the input data, removes duplicates, filters by date, 
    validates and adjusts column types, renames columns, and sorts the data by date.

    Args:
        m_df (pd.DataFrame): A DataFrame containing macro data, with the following columns:
            - date (np.datetime64): Datetime of the data point.
            - usd_per_usdc (float): USD price per USDC.
            - usd_per_usdt (float): USD price per USDT.
            
    Returns:
        pd.DataFrame: The cleaned and preprocessed macro data DataFrame.
        
    Raises:
        AssertionError: If input data has missing or invalid values, or if there are duplicates in the date column.
    """
    # date col
    assert 0 == m_df.date.isnull().sum()
    assert type(m_df.date.values[0]) == np.datetime64, "date is not the correct type."
    assert m_df.shape[0] == (m_df.date.dt.minute == 0).sum(), "all datetimes must be top of the hour"

    # cut down to relevant dates
    m_df = m_df[(m_df['date'] >= '2016-07-01') & (m_df['date'] <= '2023-01-02')]

    # drop duplicated datetime 
    m_df = m_df.drop_duplicates(subset=['date'])

    # for price and volume columns, ensure in range, no missing, and convert type down
    thresholds = {'usd_per_usdc': 1e5, 'usd_per_usdt': 1e5}
    for col, thresh in thresholds.items():
        assert 0 == m_df[(m_df[col]<0) | (m_df[col] > thresh)].shape[0]
        m_df[col] = m_df[col].astype('float32')

    # Ensure all dates are present
    min_dt, max_dt = m_df.date.min(), m_df.date.max()
    full_date_range = pd.date_range(start=min_dt, end=max_dt, freq='1H')
    assert len(m_df) == len(full_date_range)

    # Reset column names
    m_df = m_df.rename(columns={'usd_per_usdc': 'usd_per_usdc_ca',
                                'usd_per_usdt': 'usd_per_usdt_ca'})

    # ensure no duplicates by date 
    assert not m_df.duplicated(subset=['date']).any()

    # Sort by date and reset index
    m_df = m_df.sort_values(by=['date']).reset_index(drop=True)

    return m_df

In [5]:
if __name__ == "__main__":
    # set args
    CW_IN_FP = '../data/derived/cm_to_coinapi_cw.pkl'
    PANEL_IN_FP = '../data/raw/coinapi_panel_hourly.pkl'
    MACRO_IN_FP = '../data/raw/coinapi_macro_hourly.pkl'
    PANEL_OUT_FP = '../data/derived/ca_panel.pkl'
    CW_OUT_FP    = '../data/derived/ca_cm_cw.pkl'
    MACRO_OUT_FP = '../data/derived/ca_macro.pkl'

    # import 
    df    = pd.read_pickle(PANEL_IN_FP)
    cw_df = pd.read_pickle(CW_IN_FP)
    m_df  = pd.read_pickle(MACRO_IN_FP)

    # clean
    cw_df = cleanCrosswalk(cw_df)
    df = cleanPanel(df, cw_df)
    m_df = cleanMacro(m_df)

    # save
    df.to_pickle(PANEL_OUT_FP)
    cw_df.to_pickle(CW_OUT_FP)
    m_df.to_pickle(MACRO_OUT_FP)