In [1]:
import pandas as pd
import numpy as np
import os

In [67]:
def clean(location):
    """
    function that will clean a dataframe according to
    a file location 
    """
    def read(location):
        """ read the csv and error out if it fails """
        try:
            df = pd.read_csv(location)
            df['Date'] = pd.to_datetime(df['Date'])
            return df
        except:
            print(f'file not found at {location}')
            return
        
    def subset_columns(dataframe, select = ['Date', 'PX_LAST']):
        """ removes irrelevant columns imported from bloomberg """
        df = dataframe.loc[:, select]
        return df
    
    def initialize_columns(dataframe):
        """ renames columns and sets time as index """
        df = dataframe.copy()
        df.columns = ['Date', 'Value']
        df.set_index('Date', inplace = True)
        return df

    
    return initialize_columns(subset_columns(read(location)))

In [128]:
def get_value_data(files, columns):
    """
    calling this will get all the csv in a directory, read and attempt to 
    clean them all, then outer join them along their dates, providing the final
    values themselves
    """
    dfs = [clean(f) for f in files]
    df = pd.concat(dfs, axis = 1, join='outer').sort_index()
    df.columns = columns
    return df

def to_stationary(series):
    """ custom pct change / difference function according to yield / total return """
    if max(series.dropna()) < 10:
        return (series - series.shift(1)) / 100
    else:
        return (series / series.shift(1)) - 1
    return df.apply(to_stationary, axis = 0)

In [129]:
class Data:
    
    def __init__(self, value_data):
        self.data = value_data

    def get_ffill_data(self):
        return self.data.fillna(method = 'ffill').dropna()
        
    def get_pct_return(self):
        return self.data / self.data.shift(-1) - 1
    
    def get_standard_return(self):
        return self.data.apply(to_stationary, axis = 0)

    def get_true_return(self):
        filled = self.data.fillna(method = 'ffill')
        return filled.apply(to_stationary, axis = 0)
                
    def __mle_normal_params__(series):
        mu = np.mean(series.dropna())
        var = np.var(series.dropna(), ddof=0)
        return mu, var

    def __bounded_normal__(mu, var):
        sd = np.sqrt(var)
        sample =  np.random.normal(mu, sd)
        return min(sample, mu + 1.5*sd) if sample > 0 else max(sample, mu - 1.5*sd)

    def get_mle_interpolate_return(self):
        def interpolate(series):
            mu, var = mle_normal_params(series)
            return np.where(series.isna(),  np.random.normal(mu, np.sqrt(var)), series)
        return self.data.apply(interpolate, axis = 0)

    def get_mle_interpolate_bounded_return(self):
        def interpolate(series):
            mu, var = mle_normal_params(series)
            return np.where(series.isna(),  np.random.normal(mu, np.sqrt(var)), series)
        return self.data.apply(interpolate, axis = 0)

In [113]:
def initialize_data():
    """ initializes all variables and informs scripts of imported functions"""

    directory = r'../data/final_csv'
    files = [f'{directory}/{f}' for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    columns = ['us_agg', 'jpy', 'global_agg', 'gb', 'em_agg', 'ger', 'euro_agg', 'us']
    value_data = get_value_data(files, columns)
    return Data(value_data)

In [114]:
print("Imported function `get_value_data(files, columns)` -> dataframe of values")
print("Imported function `get_value_data(files, columns)` -> dataframe of returns")
print("Imported function `initialize_data()` -> tuple of value and returns data")

Imported function `get_value_data(files, columns)` -> dataframe of values
Imported function `get_value_data(files, columns)` -> dataframe of returns
Imported function `initialize_data()` -> tuple of value and returns data
