In [None]:
!pip install panelsplit

In [None]:
import pandas as pd
import numpy as np
from panelsplit.cross_validation import PanelSplit
from panelsplit.application import cross_val_fit_predict
from typing import Union
from sklearn.ensemble import RandomForestClassifier
from functools import reduce
import pickle

In [None]:
class TargetEngineer():

  """
  Class to generate target variables for incidence and onset under a given horizon.

  Args
  ----

  df: pd.DataFrame
    Dataframe with at least columns that include [unit, time, y_col]

  unit: str
    Column that defines your unit. E.g. 'isocode'

  time: str
    Column that defines your time. E.g. 'period'

  y_col: str
    Column that defines your y variable. E.g. 'violence'
  """

  def __init__(self, df:pd.DataFrame, unit:str, time:str, y_col:str):

    self.df = df.copy()
    self.unit = unit
    self.time = time
    self.y_col = y_col

  def any(self, threshold:int):

    """
    Function to compute "any" variable.

    Args
    ----

    threshold: int
      Threshold to apply to self.y_col.

    Returns
    -------

    pd.DataFrame
      Dataframe with any variable.

    str
      Name of the any variable.
    """

    any_col = f"any{self.y_col}_th{threshold}"
    self.df[any_col] = (self.df[self.y_col] > threshold).astype(int)
    return self.df.copy(), any_col

  def incidence(self, threshold:int, horizon:int):

    """
    Function to compute incidence target variable based on the specific threshold and horizon.

    Args
    ----

    threshold: int
      Threshold to apply to self.y_col.

    horizon: int
      Forecasting horizon (assumes aggregated window).

    Returns
    -------

    pd.DataFrame
      Dataframe with y_col, any_col, and target_col.
    """

    #make the any variable
    df, any_col = self.any(threshold)

    #get the rolling max value of your any variable over the specified horizon
    any_col_max = f"{any_col}_max"
    df[any_col_max] = self.df.groupby(self.unit)[any_col]. \
            transform(lambda x: x.rolling(window=horizon, min_periods = horizon).max())

    #shift any_col_max by the specified horizon to get your incidence target variable
    target_col = f"inc_{any_col}_h{horizon}"
    df[target_col] = df.groupby(self.unit)[any_col_max].transform(lambda x: x.shift(-horizon))

    return df[[self.y_col, any_col, target_col]]

  def onset(self, threshold:int, horizon:int):

    """
    Function to compute onset target variable based on the specific threshold and horizon.

    Args
    ----

    threshold: int
      Threshold to apply to self.y_col.

    horizon: int
      Forecasting horizon (assumes aggregated window).

    Returns
    -------

    pd.DataFrame
      Dataframe with y_col, any_col, and target_col.
    """

    #make the any variable
    df, any_col = self.any(threshold)

    def _onset(x:pd.Series, h:int):

      """
      Function to compute onset target variable for a single unit

      Args
      ---

      x: pd.Series
        The "any" variable for a single unit.

      h: int
        Forecasting horizon (assumes aggregated window).

      Returns
      -------

      pd.Series
        The onset target variable for a single unit.
      """

      index = x.index #get the index
      x = list(x) #convert to list
      y = [] #empty list for storing the onset target
      for i in range(len(x)): #iterate over every element in x
          i0 = i+1 #index of the next period
          i1 = i0+h #index at the end of the forecast horizon
          if i1 <= len(x) and x[i]==0: #first if condition is to handle the last h observations. Second condition states if any==0.
              y.append(np.max(x[i0:i1])) #append the max of the any column in the next h periods, assuming any==0 currently
          else:
              y.append(np.nan) #otherwise append NA
      return pd.Series(y, index)

    target_col = f"ons_{any_col}_h{horizon}"
    df[target_col] = self.df.groupby(self.unit)[any_col].transform(lambda x: _onset(x, horizon))

    return df[[self.y_col, any_col, target_col]]

In [None]:
class FeatureEngineer:

    """
    This is a class that contains general methods that can be applied to a DataFrame to create new features. Examples of such methods include creating lagged variables, rolling min/mean/max/sum and weighted rolling mean/sum.
    The methods in this class are designed to be used in a pipeline to create new features for a given DataFrame.

    Attributes:
    -----------
    groupby_cols : Union[str, list]
        A str or list of columns to group by

    Methods:
    --------

    lag(input_df:pd.DataFrame, y_col:str, lags:list):
        This is a method that creates lagged variables for a given column in a DataFrame.

    rolling_sum(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):
        This is a method that creates the rolling sum of specified windows for a given column in a DataFrame.

    rolling_mean(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):
        This is a method that creates the rolling mean of specified windows for a given column in a DataFrame.

    rolling_min(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):
        This is a method that creates the rolling min of specified windows for a given column in a DataFrame.

    rolling_max(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):
        This is a method that creates the rolling max of specified windows for a given column in a DataFrame.

    rolling_std(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):
        This is a method that creates the rolling standard deviation of specified windows for a given column in a DataFrame.

    create_exponential_weights(window_size, alpha=0.8):
        This is a method that enables generating "rolling" exponential weights for a given window size.

    weighted_rolling_sum(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False, alpha = 0.8):
        This is a method that creates the weighted rolling sum of specified windows for a given column in a DataFrame.

    weighted_rolling_mean(input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False, alpha = 0.8):
        This is a method that creates the weighted rolling mean of specified windows for a given column in a DataFrame.

    count_since(input_df:pd.DataFrame, y_col:str, thresholds:list, shift_knowledge:int = None):
        This is a method that counts the number of periods since a variable has been above a given threshold.

    ongoing(input_df:pd.DataFrame, y_col:str, thresholds:list, shift_knowledge:int = None):
        This is a method that represents a sequential count of the number of periods for which a variable has been above a given threshold.

    Notes:
    -------
    Be very careful with NAs when using the count_since_thresh and ongoing_episode methods.
    The way we are computing things here (i.e. using a > th condition) means they are treated as a 0/False.
    """

    def __init__(self, groupby_cols: Union[str, list]):

        self.groupby_cols = groupby_cols

    def _index_check(self, df:pd.DataFrame):

        """
        This is a method that checks if the index of a DataFrame is sorted correctly.

        Args:
        -----
        :param df: The DataFrame to check.

        Returns:
        --------
        :return: The DataFrame with a sorted index.
        """

        assert df.index.is_monotonic_increasing, "The index of the DataFrame should be monotonically increasing."

    def lag(self, input_df:pd.DataFrame, y_col:str, lags:list):

        """
        This is a method that creates lagged variables for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create lagged variables.
        :param lags: A list of lag values to create.

        Returns:
        --------
        :return: The original DataFrame with the lagged variables appended.
        """
        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_basic_lag' + str(lag) for lag in lags]
        for idx, lag in enumerate(lags):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].shift(lag)
        return df

    def rolling_sum(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):

        """
        This is a method that creates the rolling sum of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create lagged variables.
        :param groupby_cols: A list of columns to group by.
        :param windows: A list of windows to generate a rolling sum for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs: A boolean indicating whether to return the log of the rolling sum.

        Returns:
        --------
        :return: The original DataFrame with the rolling sum variables appended.

        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_rolling_sum' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).sum().values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def rolling_mean(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):

        """
        This is a method that creates the rolling mean of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create rolling variables.
        :param windows: A list of windows to generate a rolling mean for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs: A boolean indicating whether to return the log of the rolling mean.

        Returns:
        --------
        :return: The original DataFrame with the rolling mean variables appended.

        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_rolling_mean' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).mean().values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def rolling_min(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):

        """
        This is a method that creates the rolling min of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create rolling variables.
        :param windows: A list of windows to generate a rolling min for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs: A boolean indicating whether to return the log of the rolling min.

        Returns:
        --------
        :return: The original DataFrame with the rolling min variables appended.

        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_rolling_min' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).min().values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def rolling_max(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):

        """
        This is a method that creates the rolling max of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create rolling variables.
        :param windows: A list of windows to generate a rolling max for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs: A boolean indicating whether to return the log of the rolling max.

        Returns:
        --------
        :return: The original DataFrame with the rolling max variables appended.

        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_rolling_max' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).max().values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def rolling_std(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False):

        """
        This is a method that creates the rolling standard deviation of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create rolling variables.
        :param windows: A list of windows to generate a rolling standard deviation for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs: A boolean indicating whether to return the log of the rolling standard deviation.

        Returns:
        --------
        :return: The original DataFrame with the rolling standard deviation variables appended.

        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_rolling_std' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).std().values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def _create_exponential_weights(self, window_size, alpha=0.8):

        """
        This is a method that enables generating "rolling" exponential weights for a given window size.

        Args:
        -----
        :param window_size: The size of the window for which weights are calculated.
        :param alpha: The decay factor for weights, defaults to 0.5.
                    A higher alpha discounts older observations faster.

        Returns:
        -----
        :return: A numpy array of weights.
        """

        weights = alpha ** np.arange(window_size)
        normalized_weights = weights / weights.sum()
        return normalized_weights[::-1]

    def weighted_rolling_sum(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False, alpha = 0.8):

        """
        This is a method that creates the weighted rolling sum of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create weighted rolling variables.
        :param groupby_cols: A list of columns to group by.
        :param windows: A list of windows to generate a weighted rolling sum for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs:  A boolean indicating whether to return the log of the weighted rolling sum.
        :param alpha: The decay factor for weights, defaults to 0.8. A higher alpha discounts older observations faster.

        Returns:
        -----
        :return: The original DataFrame with the weighted rolling sum variables appended.
        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_weighted_rolling_sum' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).apply(lambda x: np.sum(self._create_exponential_weights(len(x), alpha) * x), raw = True).values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def weighted_rolling_mean(self, input_df:pd.DataFrame, y_col:str, windows:list, closed = None, return_logs = False, alpha = 0.8):
        """
        This is a method that creates the weighted rolling mean of specified windows for a given column in a DataFrame.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create weighted rolling variables.
        :param windows: A list of windows to generate a weighted rolling mean for.
        :param closed: A string indicating the side of the window interval to close on. Closed = 'left' omits the current observation.
        :param return_logs:  A boolean indicating whether to return the log of the weighted rolling mean.
        :param alpha: The decay factor for weights, defaults to 0.8. A higher alpha discounts older observations faster.

        Returns:
        --------
        :return: The original DataFrame with the weighted rolling mean variables appended.
        """

        df = input_df.copy()

        self._index_check(df)

        col_names = [y_col + '_weighted_rolling_mean' + str(w) for w in windows]

        for idx, w in enumerate(windows):
            df[col_names[idx]] = df.groupby(self.groupby_cols)[y_col].rolling(w, min_periods=1, closed = closed).apply(lambda x: np.sum(self._create_exponential_weights(len(x), alpha) * x) / len(x), raw = True).values
            if return_logs:
                df['ln_' + col_names[idx]] = np.log1p(df[col_names[idx]])
                df = df.drop(col_names[idx], axis = 1)
        return df

    def _count_since(self, x: pd.Series):
        """
        This is a method that counts the number of periods since a variable has been 1.

        :param x: A pandas Series containing the target variable.

        Returns:
        - y (list): A list containing the number of periods since the target variable has been 1.
        """

        x = list(x)
        y = []
        for n in range(0, len(x)):
            if (x[n] == 0) & (n == 0):
                y.append(1) # if it starts with no flows
            elif x[n] == 1:
                y.append(0) # reset to 0 if flows
            else:
                y.append(y[n-1]+1) # add 1 if no flows
        return y

    def since(self, input_df:pd.DataFrame, y_col:str, thresholds:list, shift_knowledge:int = None):

        """
        This is a method that counts the number of periods since a variable has been above a given threshold.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create the count since variable.
        :param thresholds: A list of thresholds to count since.
        :param shift_knowledge: An integer defining by how many periods to shift the count since variable.

        Returns:
        --------
        :return: The original DataFrame with the count since variables appended.
        """

        df = input_df.copy()

        self._index_check(df)


        binary_col_names = [y_col + '_above' + str(th) for th in thresholds]
        col_names = [y_col + '_since_' + str(th) for th in thresholds]

        for idx, th in enumerate(thresholds):
            df[binary_col_names[idx]] = (df[y_col] > th).astype(int)
            df[col_names[idx]] = df.groupby(self.groupby_cols)[binary_col_names[idx]].transform(self._count_since)

            if shift_knowledge is None:
                pass
            else:
                #in case we need to shift by one since we don't know the y_col in current period
                df[binary_col_names[idx]] = df.groupby(self.groupby_cols)[[binary_col_names[idx]]].shift(shift_knowledge)
                df[col_names[idx]] = df.groupby(self.groupby_cols)[col_names[idx]].shift(shift_knowledge)
        return df[[y_col, *[x for x in df.columns if 'since' in x]]]

    def _count_ongoing(self, x: pd.Series):
        """
        This is a method that generates a sequential count of the periods for which a variable has been 1.

        :param x: A pandas Series containing the target variable.

        Returns:
        - y (list): A list containing the sequential count of the periods for which the target variable has been 1.
        """

        x = list(x)
        y = []
        episode_counter = 0
        for n in range(0, len(x)):
            if (x[n] == 0) & (n == 0):
                y.append(episode_counter) # if it starts with no flows
            elif x[n] == 1:
                episode_counter += 1
                y.append(episode_counter) # if there are flows
            else:
                y.append(0) # reset to 0 if no flows
                episode_counter = 0
        return y

    def ongoing(self, input_df:pd.DataFrame, y_col:str, thresholds:list, shift_knowledge:int = None):

        """
        This is a method that represents a sequential count of the number of periods for which a variable has been above a given threshold.

        Args:
        -----
        :param input_df: The DataFrame containing the data.
        :param y_col: The name of the column for which to create the count since variable.
        :param thresholds: A list of thresholds to count since.
        :param shift_knowledge: An integer defining by how many periods to shift the count since variable.

        Returns:
        --------
        :return: The original DataFrame with the count since variables appended.
        """

        df = input_df.copy()

        self._index_check(df)

        binary_col_names = [y_col + '_above' + str(th) for th in thresholds]
        col_names = [y_col + '_ongoing_' + str(th) for th in thresholds]

        for idx, th in enumerate(thresholds):
            df[binary_col_names[idx]] = (df[y_col] > th).astype(int)
            df[col_names[idx]] = df.groupby(self.groupby_cols)[binary_col_names[idx]].transform(self._count_ongoing)

            if shift_knowledge is None:
                pass
            else:
                #in case we need to shift by one since we don't know the y_col in current period
                df[binary_col_names[idx]] = df.groupby(self.groupby_cols)[[binary_col_names[idx]]].shift(shift_knowledge)
                df[col_names[idx]] = df.groupby(self.groupby_cols)[col_names[idx]].shift(shift_knowledge)
        return df[[y_col, *[x for x in df.columns if 'ongoing' in x]]]


# Load data

In [None]:
#add path to data and where you will save outputs
data_path = ""
output_path = ""

In [None]:
#read the data
ucdp = pd.read_csv(f"{data_path}/ucdp.csv", index_col=0)
ucdp = ucdp.set_index(['isocode', 'period']).sort_index() #set index to our id_vars and sort values

#note: sorting values is CRITICAL when using group by operations
ucdp

#no need for population for demonstration. Keep it if you want a per capita target definition.
ucdp = ucdp.drop(columns=["population"])
ucdp['fatalities_UCDP'] = ucdp['fatalities_UCDP'].astype(int)
#rename column for convenience
ucdp = ucdp.rename(columns={"fatalities_UCDP": "violence"})

# Construct panel

## Construct target

In [None]:
#initialize our class
te = TargetEngineer(df=ucdp, unit='isocode', time='period', y_col="violence")

In [None]:
#set threshold and horizon to construct the target
threshold = 0
horizon = 3

#get incidence and onset target dataframes
inc_df = te.incidence(threshold=threshold, horizon=horizon)
inc_target_col = 'inc_anyviolence_th0_h3'
ons_df = te.onset(threshold=threshold, horizon=horizon)
ons_target_col = 'ons_anyviolence_th0_h3'

#save target and onset dfs
inc_df.to_csv(f'{output_path}/incidence.csv')
ons_df.to_csv(f'{output_path}/onset.csv')

## Construct features

In [None]:
#instantitate feature engineer
fe = FeatureEngineer(groupby_cols='isocode')
y_col = 'violence' #same for all feature engineering
windows = [1, 3, 12, 36, 60] #for rolling mean
thresholds = [0] #for since and ongoing

features = [
    fe.rolling_mean(ucdp, y_col, windows).drop(columns = ['violence']),
    fe.since(ucdp, y_col, thresholds).drop(columns = ['violence']),
    fe.ongoing(ucdp, y_col, thresholds).drop(columns = ['violence']),
    pd.read_csv(f'{data_path}/topics.csv', index_col=0).set_index(['isocode', 'period'])
]


In [None]:
X = reduce(lambda left, right: pd.merge(left, right, left_index=True, right_index=True, how='inner'), features)

#save features
X.to_csv(f'{output_path}/features.csv')

# Predictions

In [None]:
#set key panelsplit parameters
n_splits = 24
test_size = 1
gap = horizon - 1

## Incidence

In [None]:
#instantiate panelsplit
inc_ps = PanelSplit(
    periods = inc_df.index.get_level_values('period'),
    n_splits=n_splits,
    test_size=test_size,
    gap = gap
)

inc_final_preds = inc_ps.gen_test_labels(inc_df.merge(X['violence_since_0'], left_index=True, right_index=True, how='left'))

In [None]:
%%timeit

inc_preds, fitted_estimators_inc = cross_val_fit_predict(
    estimator=RandomForestClassifier(max_depth=4, max_features=0.2, min_samples_leaf=100, n_jobs=-1, random_state=42),
    X=X,
    y=inc_df[inc_target_col],
    cv=inc_ps,
    method='predict_proba',
    drop_na_in_y=True
)

inc_final_preds['preds'] = inc_preds[:, 1]

inc_final_preds.to_csv(f'{output_path}/incidence_predictions.csv')
with open(f'{output_path}/fitted_estimators_inc.pkl', 'wb') as f:
    pickle.dump(fitted_estimators_inc, f)


## Onset

In [None]:
ons_ps = PanelSplit(
    periods = ons_df.index.get_level_values('period'),
    n_splits=n_splits,
    test_size=test_size,
    gap = gap
)

ons_final_preds = ons_ps.gen_test_labels(inc_df.merge(X['violence_since_0'], left_index=True, right_index=True, how='left'))

ons_preds, fitted_estimators_inc = cross_val_fit_predict(
    estimator=RandomForestClassifier(max_depth=4, max_features=0.2, min_samples_leaf=100, n_jobs=-1, random_state=42),
    X=X,
    y=ons_df
    cv=ons_ps,
    method='predict_proba',
    drop_na_in_y=True
)

ons_final_preds['preds'] = ons_preds[:, 1]

ons_final_preds.to_csv(f'{output_path}/onset_predictions.csv')
with open(f'{output_path}/fitted_estimators_ons.pkl', 'wb') as f:
    pickle.dump(fitted_estimators_inc, f)
