In [1]:
import numpy as np
from pandas import DataFrame
import pandas as pd
from numpy import ndarray




In [2]:

class DataLoader(object):

    cols = ["be_me", "ret_12_1", "market_equity", "ret_1_0", "rvol_252d", "beta_252d", "qmj_safety", "rmax1_21d",
            "chcsho_12m", "ni_me", "eq_dur", "ret_60_12", "ope_be", "gp_at", "ebit_sale", "at_gr1", "sale_gr1",
            "at_be", "cash_at", "age", "z_score"]

    cols1 = ["permno", "date", "ret_exc_lead1m", "be_me", "ret_12_1", "market_equity", "ret_1_0", "rvol_252d",
             "beta_252d", "qmj_safety", "rmax1_21d", "chcsho_12m", "ni_me", "eq_dur", "ret_60_12", "ope_be",
             "gp_at", "ebit_sale", "at_gr1", "sale_gr1", "at_be", "cash_at", "age", "z_score"]

    def __init__(self, csv_file_path: str):
        self.data: DataFrame = pd.read_csv(csv_file_path)

    def slice(self, start: int, end: int) -> DataFrame:
        """
        Slice a dataload to look for data within the certain time period
        :param start: slice start, inclusive. In form of YYYYMMDD (19900000)
        :param end: slice end, exclusive.
        :return: a pandas dataframe after slicing
        """
        data = self.data[(self.data["date"] >= start) & (self.data["date"] < end)]
        data = data.dropna(subset=['me', 'ret_exc_lead1m'])
        # exclude nano caps
        data = data.loc[data['size_grp'] != 'nano']
        # delete observation with more than 5 out of the 21 characteristics missing
        data["missing_num"] = data[DataLoader.cols].isna().sum(1)
        data = data.loc[data['missing_num'] <= 5]

        # impute the missing characteristics by replacing them with the cross-sectional median
        for i in DataLoader.cols:
            data[i] = data[i].astype(float)
            data[i] = data[i].fillna(data.groupby('date')[i].transform('median'))

        data = data[DataLoader.cols1]
        data = data.dropna()

        # rank transformation
        # each characteristic is transformed into the cross-sectional rank
        for i in DataLoader.cols:
            data[i] = data.groupby("date")[i].rank(pct=True)

        data.sort_values(by=['date', 'permno'], inplace=True)

        return data

    @staticmethod
    def get_x(df: DataFrame) -> ndarray: return df[DataLoader.cols].to_numpy()

    @staticmethod
    def get_y(df: DataFrame) -> ndarray: return df["ret_exc_lead1m"].to_numpy()

    @staticmethod
    def get_y_quantiles(df: DataFrame) -> ndarray:
        raw_y = df["ret_exc_lead1m"].to_numpy()
        def calculate_single_quantile_fn(y):
            return (raw_y[:] < y).sum() / raw_y.shape
        calculate_single_quantile = np.vectorize(calculate_single_quantile_fn)
        return np.apply_along_axis(calculate_single_quantile, 0, raw_y)



In [3]:
data = DataLoader("data/usa.csv")

  self.data: DataFrame = pd.read_csv(csv_file_path)


In [9]:
import numpy as np
import cvxpy as cp
from numpy import ndarray
try:
    from sklearn import r2_score
except ImportError:
    from sklearn.metrics import r2_score


class LassoQuantileModel(object):

    def __init__(self, data_loader: DataLoader, lambda_value: float):
        self.data_loader = data_loader
        self.lambda_value = lambda_value
        self.beta = np.zeros((len(self.data_loader.cols)))
        self.intercept = 0.0
        self.objective = 0.0

    def _cp_loss_fn(self, X: np.array, Y: np.array, beta: cp.Variable, intercept: cp.Variable):
        raw_predicted_y = X @ beta + intercept
        return (1.0 / X.shape[0]) * (cp.norm2(raw_predicted_y - Y)**2)

    def _objective_fn(self, X, Y, beta, intercept, lambda_value):
        return self._cp_loss_fn(X, Y, beta, intercept) + lambda_value * cp.norm1(beta)

        # train with sklearn
    def fit(self, start: int, end: int) -> None:
        df = self.data_loader.slice(start, end)
        x_train = self.data_loader.get_x(df)
        y_train = self.data_loader.get_y_quantiles(df)

        beta = cp.Variable(len(self.data_loader.cols))
        intercept = cp.Variable(1)
        problem = cp.Problem(cp.Minimize(self._objective_fn(x_train, y_train, beta, intercept, self.lambda_value)))
        problem.solve(solver=cp.SCS)

        self.beta = beta.value
        self.intercept = intercept.value
        self.objective = problem.objective

    @classmethod
    def validate(cls, data_loader: DataLoader,
                 train_start: int,
                 train_end: int,
                 validate_start: int,
                 validate_end: int,
                 lambda_values: list):
        """
        Tune hyperparameters using a validation set
        :param validate_end:
        :param validate_start:
        :param train_end:
        :param train_start:
        :param data_loader: DataLoader object
        :param lambda_values: List of lambda values to conduct grid search
        """
        validate_df = data_loader.slice(validate_start, validate_end)
        y_validate = data_loader.get_y(validate_df)

        best_model = None
        best_lambda = None
        best_r2 = -float('inf')

        for lambda_val in lambda_values:
            model = LassoQuantileModel(data_loader, lambda_val)
            model.fit(train_start, train_end)
            y_pred = model.predict(validate_start, validate_end)

            r2 = r2_score(y_validate, y_pred)  # Calculate R-squared

            if r2 > best_r2:
                best_lambda = lambda_val
                best_r2 = r2
                best_model = model

        return best_model, best_r2, best_lambda

    # def _to_quantiles(self, raw_y: np.array) -> np.array:
    #     def calculate_single_quantile_fn(y):
    #         return (raw_y[:] < y).sum() / raw_y.shape
    #     calculate_single_quantile = np.vectorize(calculate_single_quantile_fn)
    #     return np.apply_along_axis(calculate_single_quantile, 0, raw_y)

    def _to_quantiles(self, raw_y: np.array) -> np.array:
        max = raw_y.max()
        min = raw_y.min()
        rng = max - min
        return (raw_y - min) / rng

    def predict(self, start: int, end: int) -> ndarray:
        # Slice the data for the prediction period
        df = self.data_loader.slice(start, end)
        x_pred = self.data_loader.get_x(df)
        raw_y_pred = x_pred @ self.beta + self.intercept
        return self._to_quantiles(raw_y_pred)

    def evaluate(self, start: int, end: int) -> float:
        """
        Give evaluation metric of a trained/fitted model on a given test/validation period
        :param start: period start year
        :param end: period end year
        :return: an evaluation metric as floating number
        """
        df = self.data_loader.slice(start, end)
        y_actual = self.data_loader.get_y_quantiles(df)
        y_pred = self.predict(start, end)
        # Calculate R-squared
        r2 = r2_score(y_actual, y_pred)
        return r2

# model = LassoQuantileModel(data, 0.001)
# model.fit(19800101, 19950101)
# model.evaluate(19950101, 20000101)

best_model, best_r2, best_lambda = LassoQuantileModel.validate(data, 19800101, 19950101, 19950101, 20000101, [0, 0.0000001, 0.00001, 0.001])
best_r2, best_lambda

(-11.389144479935478, 0.001)

In [10]:
r2_score(data.get_y_quantiles(data.slice(20000101, 20010101)), best_model.predict(20000101, 20010101))

-0.17552299445653174

In [6]:
data.get_y_quantiles(data.slice(20000101, 20010101))

array([0.19097305, 0.44310458, 0.80138028, ..., 0.60767947, 0.55999583,
       0.73098021])

In [8]:
best_model.predict(20000101, 20010101)

array([0.44756141, 0.50342833, 0.40124911, ..., 0.63957787, 0.77478834,
       0.3178959 ])