In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
dates = pd.date_range('2018-01-01', '2018-12-31')
X = pd.Series(dates).dt.dayofyear  # Create X to be day of the year.
X.index = dates
print(X.head())

In [None]:
from sklearn.base import BaseEstimator
from sklearn.pipeline import TransformerMixin
from sklearn.metrics.pairwise import rbf_kernel


class RBFencoder(TransformerMixin, BaseEstimator):
    def __init__(self, gamma, centers, max_observed_value=None):
        self.gamma = gamma
        self.centers = np.array(centers).reshape(-1, 1)

        if max_observed_value:
            self.max_observed_value = max_observed_value
        else:
            self.max_observed_value = max(centers)

    def transform(self, X, y=None):
        direct = rbf_kernel(X=X.values.reshape(-1, 1),
                            Y=self.centers, gamma=self.gamma)
        right = rbf_kernel(X=X.values.reshape(-1, 1) +
                           self.max_observed_value, Y=self.centers, gamma=self.gamma)
        left = rbf_kernel(X=X.values.reshape(-1, 1) -
                          self.max_observed_value, Y=self.centers, gamma=self.gamma)

        rbf_values = np.maximum.reduce([direct, right, left])        
        return rbf_values

    def fit(self, X, y):
        return self

In [None]:
# Space twelve centers over the year to represent the months.
month_centers = np.round(np.linspace(15, 350, 12)).astype(int).tolist()
print(f'Centers: {month_centers}')
print(f'Number of centers: {len(month_centers)}')

encoder = RBFencoder(gamma=0.005, centers=month_centers, max_observed_value=365)
rbf_features = encoder.transform(X)

print(f'X shape: {X.shape}')
print(f'Output shape: {rbf_features.shape}')

print(f'\nSample output features:\n{rbf_features[:2]}')

# Alternatively:
# from sklearn.pipeline import make_pipeline
# ppl = make_pipeline(RBFencoder(gamma=0.0005, centers=month_centers, max_observed_value=365))
# ppl.transform(X)

In [None]:
plt.figure(figsize=(15, 5))

plot_features = rbf_features
# Make features sum to 1:
# plot_features = rbf_features / np.sum(rbf_features, axis=1).reshape(rbf_features.shape[0], 1)

for i in range(len(month_centers)):
    plt.scatter(X, plot_features[:, i], label=f'Proximity to day {month_centers[i]}', s=5)
plt.xlabel('Observed day of year')
plt.ylabel('Feature value')
plt.legend(loc='right')
plt.show()

In [None]:
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

class RepeatingBasisFunction(TransformerMixin, BaseEstimator):
    """Accepts X which has exactly one column"""
    def __init__(self, periods=12, modulo=365):
        self.periods = periods
        self.modulo = modulo

    def fit(self, X, y=None):
        """Fits the estimator"""
        X = check_array(X, estimator=self)
        # This transformer only accepts one feature as input
        if X.shape[1] != 1:
            raise ValueError(f'X should have exactly one column, it has: {X.shape[1]}')

        # last element is excluded because the distance between 0 and modulo is 0, so they're very close
        self.bases_ = np.linspace(0, self.modulo, self.periods+1)[:-1]

        # curves should be wider for longer windows and narrower if we have more curves
        self.width_ = (self.modulo / self.periods)

        return self

    def transform(self, X):
        check_is_fitted(self, ['bases_', 'width_'])
        X = check_array(X, estimator=self)
        # This transformer only accepts one feature as input
        # This transformer only accepts one feature as input
        if X.shape[1] != 1:
            raise ValueError(f'X should have exactly one column, it has: {X.shape[1]}')

        # get array
        if type(X) == pd.DataFrame:
            X = X.values

        X = X % self.modulo

        base_offsets = self._cycle_apply(X, self.bases_, self.modulo)

        # apply rbf function to series for each basis
        return self._rbf(base_offsets)

    def _cycle_dist(self, arr: np.ndarray, base: float, modulo: float) -> np.ndarray:
        """Calculates the absolute difference between values in array and base,
        where 0 and modulo are assumed to be at the same position"""

        abs_diff = np.abs(arr - base)
        alt = modulo-abs_diff
        concat = np.concatenate((abs_diff.reshape(-1, 1), alt.reshape(-1, 1)), axis=1)
        final = concat.min(axis=1)
        return final

    def _cycle_apply(self, array, bases, modulo):
        array = array.reshape(-1, 1)
        bases = bases.reshape(1, -1)

        return np.apply_along_axis(lambda b: self._cycle_dist(array, base=b, modulo=modulo),
                                   axis=0,
                                   arr=bases)

    def _rbf(self, arr):
        return np.exp(-(arr/self.width_)**2)

In [None]:
import timeit

In [None]:
def f(X):
    encoder = RBFencoder(gamma=0.005, centers=month_centers, max_observed_value=365)
    return encoder.transform(X)

%timeit for x in range(100): f(X)

In [None]:
def g(X):
    encoder = RepeatingBasisFunction(periods=12, modulo=365)
    encoder.fit(X)
    return encoder.transform(X)

X_2d = np.array(X).reshape(-1, 1)

%timeit for x in range(100): g(X_2d)