In [None]:
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.interpolate import UnivariateSpline, CubicSpline
from statsmodels.tsa.seasonal import seasonal_decompose

In [None]:
def series_periodos(inicio, periodos, freq): 
    serie = pd.date_range(start=inicio, periods=periodos, freq=freq)
    return serie

TRASLACIÓN:

In [None]:
# Desplazamiento espacial de la serie

def traslacion(df,shift,freq):
    
    df_trasl =df.copy()
    for x in df_trasl.columns:
        data = df[x]
        data_augmented = df[x] + shift
        datos = np.concatenate((data.values,data_augmented))
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(datos),freq)
            df_trasl = pd.DataFrame(data=datos,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data = datos,index=indice,columns=[x])
            df_trasl = df_trasl.join(df_new, how="outer")
    return df_trasl

ESCALADO:

In [None]:
# Multiplicación por un factor de la serie

def escalado(df,freq,factor):
    
    df_esc =df.copy()
    for x in df_esc.columns:
        data = df[x]
        data_augmented = df[x]*factor
        datos = np.concatenate((data.values,data_augmented))
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(datos),freq)
            df_esc = pd.DataFrame(data=datos,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data = datos,index=indice,columns=[x])
            df_esc= df_esc.join(df_new, how="outer")
    return df_esc

INTERPOLACIÓN:

In [None]:
# Definimos nuevos datos indicando el número de datos a generar, la frequencia y el tipo de interpolación

def interpolacion_min_max(df,kind,num,freq):
    
    df=df.reset_index()
    indices=df.index.values
    indice=series_periodos(df[df.columns[0]][0],num+df.shape[0],freq)
    x = indices 
    for i in range(1,len(df.columns)):
        y = df[df.columns[i]]
        inicio = min(df[df.columns[i]].argmin(),df[df.columns[i]].argmax())
        fin = max(df[df.columns[i]].argmin(),df[df.columns[i]].argmax())
        f = interp1d(x, y, kind=kind) # kind ='linear' / 'cubic' / 'quadratic'
        x_new = np.linspace(inicio,fin, num=num)  # New x values
        y_new = f(x_new)  # Interpolated y values
        if i==1:
            df_int = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[df.columns[i]])
        else :     
            df_n = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[df.columns[i]])
            df_int= df_int.join(df_n, how="outer")
            
    return df_int

In [None]:
# Definimos nuevos datos indicando el número de datos a generar, la frequencia y el tipo de interpolación (lineal/cubico).

def interpolacion_normal(df,kind,num,freq):
    
    df=df.reset_index()
    indices=df.index.values
    indice=series_periodos(df[df.columns[0]][0],num+df.shape[0],freq)
    x = indices 
    for i in range(1,len(df.columns)):
        y = df[df.columns[i]]
        f = interp1d(x, y, kind=kind) # kind = 'linear' / 'cubic' / 'quadratic'
        x_new = np.linspace(0,df.shape[0]-1, num=num)  # New x values
        y_new = f(x_new)  # Interpolated y values
        if i==1:
            df_int = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[df.columns[i]])
        else :     
            df_n = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[df.columns[i]])
            df_int= df_int.join(df_n, how="outer")
            
    return df_int

In [None]:
def interpolate(data):
    interpolated_data = []
    for i in range(len(data) - 1):
        interpolated_data.append(data[i])
        interpolated_data.append((data[i] + data[i + 1]) / 2)  # Punto intermedio
    interpolated_data.append(data[-1])
    return np.array(interpolated_data)

In [None]:
# Añadimos datos que sean el punto de medio entre dos datos consecutivos

def punto_medio(df,freq):
    
    for x in df.columns:
        data = df[x]
        a = interpolate(data)
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(a),freq)
            df_pm = pd.DataFrame(data=a,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=a,index=indice,columns=[x])
            df_pm = df_pm.join(df_new, how="outer")
    return df_pm

In [None]:
def spline_interpolation_linear(data, num,s=1):
    x = np.arange(len(data))
    spline = UnivariateSpline(x, data, s=s)
    x_new = np.linspace(0,len(data)-1, num=num)
    return spline(x_new)

def spline_interpolation_cubic(data, num):
    x = np.arange(len(data))
    spline = CubicSpline(x,data)
    x_new = np.linspace(0,len(data)-1, num=num)
    return spline(x_new)

In [None]:
def interpolacion_spline(df,tipo,num,freq,s):
    
    indice=series_periodos(df.index[0],num+df.shape[0],freq)
    for x in df.columns:
        y=df[x]
        if tipo=='linear': 
            y_new = spline_interpolation_linear(df[x],num,s)
        elif tipo=='cubic':
            y_new = spline_interpolation_cubic(df[x],num)
        if x==df.columns[0]:
            df_int = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[x])
        else :     
            df_n = pd.DataFrame(data=np.concatenate((y.values.reshape(-1),y_new)),index=indice,columns=[x])
            df_int= df_int.join(df_n, how="outer")       
    return df_int

SAMPLING:

In [None]:
# Randomly sampling with replacement

def sampling(df,size,freq):
    
    np.random.seed(1)
    indice = series_periodos(df.index[0],size+df.shape[0],freq)
    for x in df.columns:
        data = df[x]
        sampled_data = np.random.choice(data, size=size, replace=True) + np.random.normal(0, 0.5, size)
        if x == df.columns[0]:
            df_sampling=pd.DataFrame(data=np.concatenate((data,sampled_data)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((data,sampled_data)),index=indice,columns=[x])
            df_sampling= df_sampling.join(df_new, how="outer")
    return df_sampling

TRANSFORMACIONES MATEMÁTICA:

In [None]:
# Aplicamos log
def agregar_log(df):
    df_o = df.copy()
    for x in df_o.columns:
        df_o[x] = np.log1p(df[x])
    return df_o

In [None]:
# Aplicamos la raíz cuadrada 
def agregar_sqrt(df):
    df_o = df.copy()
    for x in df_o.columns:
        df_o[x] = np.sqrt(df[x])
    return df_o

In [None]:
# Aplicamos la exponencial
def agregar_exp(df,factor):
    df_o = df.copy()
    for x in df_o.columns:
        df_o[x] = np.exp(df_o[x]/factor)
    return df_o 

In [None]:
# Aplicamos el seno
def agregar_sin(df):
    df_o = df.copy()
    for x in df_o.columns:
        df_o[x] = np.sin(df[x])
    return df_o

In [None]:
# Aplicamos sen + cos

def agregar_trig(df):
    df_o = df.copy()
    for x in df_o.columns:
        df_o[x] = np.cos(df_o[x]) + np.sin(df_o[x])
    return df_o

In [None]:
# Aplicamos sigmoide

def agregar_sigmoid(df):
    df_o = df.copy()
    for x in df.columns:
        df_o[x] = 1 / (1 + np.exp(-df_o[x]))
    return df_o
   

In [None]:
# Aplicamos operaciones matemáticas

def agregar_matematica(df,freq,funcion,factor=1):
    
    indice=series_periodos(df.index[0],2*df.shape[0],freq)
    for x in df.columns:
        data = df[x]
        if funcion == 'sqrt':
            transformed_data = np.sqrt(data)
        elif funcion == 'log':
            transformed_data = np.log1p(data)
        elif funcion == 'exp':
            transformed_data = np.exp(data/factor)
        elif funcion == 'sin':
            transformed_data = np.sin(data)
        elif funcion == 'cos':
            transformed_data = np.cos(data)
        elif funcion == 'trig':
            transformed_data = np.cos(data) + np.sin(data)
        elif funcion == 'sigmoide':
            transformed_data = 1 / (1 + np.exp(-data))

        if x == df.columns[0]:
            df_transf=pd.DataFrame(data=np.concatenate((data,transformed_data)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((data,transformed_data)),index=indice,columns=[x])
            df_transf= df_transf.join(df_new, how="outer")
    return df_transf

TÉCNICAS ESTADÍSTICAS:

In [None]:
def estadist(df,freq,num,tipo):
    
    indice=series_periodos(df.index[0],num+df.shape[0],freq)
    for x in df.columns:
        data = df[x]
        if tipo==1:
            transformed_data = np.zeros(num)+ data.mean()
        elif tipo==2:
            transformed_data = np.zeros(num) + data.median()
        elif tipo==3:
            transformed_data = np.zeros(num) + data.mode().iloc[0]

        if x == df.columns[0]:
            df_transf=pd.DataFrame(data=np.concatenate((data,transformed_data)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((data,transformed_data)),index=indice,columns=[x])
            df_transf= df_transf.join(df_new, how="outer")
            
    return df_transf

In [None]:
# Devuelve df con datos añadidos calculados a partir de una distribución normal con la media y desviación de los datos pasados 

def normal(df,freq,size):
    
    np.random.seed(1)
    indice=series_periodos(df.index[0],size+df.shape[0],freq)
    for x in df.columns:
        data = df[x]
        mean,std_dev = np.mean(data),np.std(data)
        data_augmented = np.random.normal(mean,std_dev,size=size)
        if x == df.columns[0]:
            df_normal=pd.DataFrame(data=np.concatenate((df[x].values,data_augmented)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((data,data_augmented)),index=indice,columns=[x])
            df_normal= df_normal.join(df_new, how="outer")
    return df_normal

In [None]:
# Calcula nuevos datos usando: media + z * desv donde la media y las desv son las de los datos pasados y z = raiz (-2 * log u1) cos(2 pi u2) tal que u1,u2 son dos randoms entre 0 e 1
def box_muller_transform(mean, std_dev, size=100):
    u1, u2 = np.random.rand(size), np.random.rand(size)
    z1 = np.sqrt(-2 * np.log(u1)) * np.cos(2 * np.pi * u2)
    return mean + z1 * std_dev

def box_muller(df,freq,size):
    
    np.random.seed(1)
    indice=series_periodos(df.index[0],size+df.shape[0],freq)
    for x in df.columns:
        data = df[x].values
        data_bm = box_muller_transform(data.mean(),data.std(),size)
        if x == df.columns[0]:
            df_bm=pd.DataFrame(data=np.concatenate((df[x].values,data_bm)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((df[x].values,data_bm)),index=indice,columns=[x])
            df_bm = df_bm.join(df_new, how="outer")
    return df_bm

RUIDO ARMONICO:

In [None]:
# Añadimos ruido armonico a la muestra con cierta amplitud y frequencia

def add_harmonic_noise(df,freq,size):
    
    np.random.seed(1)
    df_harm = df.copy()
    for x in df_harm.columns:
        data = df[x]
        time = np.arange(size)
        # Aplicar FFT
        fft_result = np.fft.fft(data)
        frequencies = np.fft.fftfreq(len(data), d=(time[1] - time[0]))  # Frecuencias asociadas
        amplitudes = np.abs(fft_result)  # Magnitudes (amplitud)
        dominant_freq_idx = np.argmax(amplitudes)
        frequency = frequencies[dominant_freq_idx]
        amplitude = amplitudes[dominant_freq_idx]
        harmonic_noise = amplitude * np.sin(2 * np.pi * frequency * time)
        data_augmented = np.random.choice(data, size=size, replace=True) + harmonic_noise
        datos = np.concatenate((data.values,data_augmented))
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(datos),freq)
            df_harm = pd.DataFrame(data=datos,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data = datos,index=indice,columns=[x])
            df_harm = df_harm.join(df_new, how="outer")
    
    return df_harm

DUPLICADO + PERTURBACIÓN:

In [None]:
# Duplicar algunos datos y añadir ruido
def duplicate_and_perturb(data, duplication_factor=0.3, perturbation_std=0.05):
    duplicated_data = []
    np.random.seed(8)
    for point in data:
        duplicated_data.append(point)
        if np.random.rand() < duplication_factor:
            duplicated_data.append(point + np.random.normal(0, perturbation_std))
    return np.array(duplicated_data)

In [None]:
# Duplicamos algunos datos añadiendole cierto ruido.

def duplicados(df,freq,duplication_factor=0.3,perturbation_std=0.05):
    
    np.random.seed(1)
    for x in df.columns:
        data = df[x]
        data_dd=duplicate_and_perturb(data,duplication_factor,perturbation_std)
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(data_dd),freq)
            df_dd = pd.DataFrame(data=data_dd,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data = data_dd,index=indice,columns=[x])
            df_dd = df_dd.join(df_new, how="outer")
            
    return df_dd

COMBINACIÓN LINEAL:

In [None]:
# Calculamos nuevos datos como combinación lineal de los otros 
def linear_combinations(data,num_datos, n_combinations):
    for _ in range(num_datos):
        datos = data[-n_combinations:]
        weights = np.random.rand(n_combinations)
        weights /= np.sum(weights)  # Normalizar pesos
        combination = np.dot(weights, datos)
        combination += np.random.normal(0,0.5)
        data=np.append(data,combination)
    return np.array(data)

In [None]:
def agregar_comb(df,freq,size,window_size):
    
    np.random.seed(1)
    for x in df.columns:
        data = df[x]
        datos = linear_combinations(data.values,size,window_size)
        if x == df.columns[0]:
            indice = series_periodos(df.index[0],len(datos),freq)
            df_dl = pd.DataFrame(data=datos,index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data = datos,index=indice,columns=[x])
            df_dl = df_dl.join(df_new, how="outer")
    return df_dl


DESCOMPOSICIÓN

In [None]:
def descomp(df,size,freq,tipo):
    
    indice=series_periodos(df.index[0],size+df.shape[0],freq)

    for x in df.columns:
        data = df[x]
        # Descomposición de la serie
        if tipo=="additive":
            descomposicion = seasonal_decompose(data, model='additive', period=12)
        elif tipo=="multiplicative":
            descomposicion = seasonal_decompose(data, model='multiplicative', period=12)
            
        tendencia = descomposicion.trend
        estacionalidad = descomposicion.seasonal
        residuo = descomposicion.resid
        # Calcular la tasa de cambio promedio de la tendencia
        tendencia_valida = tendencia.dropna()
        cambios = tendencia_valida.diff().dropna()
        tasa_cambio_promedio = cambios.mean()

        # Extrapolar los valores de la tendencia
        n_pasos = size
        ultima_tendencia = tendencia_valida.iloc[-1]
        tendencia_futura = [ultima_tendencia + (i + 1) * tasa_cambio_promedio for i in range(n_pasos)]
        
        # Replicar los valores estacionales
        longitud_estacionalidad = 12  # Basado en la periodicidad detectada
        estacionalidad_extrapolada = np.tile(estacionalidad[-longitud_estacionalidad:], int(size/12)+1)[:size]
        if tipo=="additive":
            prediccion = tendencia_futura + estacionalidad_extrapolada
        elif tipo=="multiplicative":
            prediccion = tendencia_futura * estacionalidad_extrapolada
        if x == df.columns[0]:
            df_desc=pd.DataFrame(data=np.concatenate((data,prediccion)),index=indice,columns=[x])
        else:
            df_new = pd.DataFrame(data=np.concatenate((data,prediccion)),index=indice,columns=[x])
            df_desc= df_desc.join(df_new, how="outer")
    return df_desc

MODELOS DE PREDICCIÓN:

In [None]:
from skforecast.Sarimax import Sarimax
from skforecast.ForecasterSarimax import ForecasterSarimax
from skforecast.model_selection_sarimax import backtesting_sarimax
from skforecast.model_selection_sarimax import grid_search_sarimax
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from skforecast.model_selection import grid_search_forecaster
from sklearn.metrics import mean_squared_error
from skforecast.ForecasterAutoregDirect import ForecasterAutoregDirect
from sklearn.linear_model import Ridge
from prophet import Prophet
from sklearn.preprocessing import StandardScaler

In [None]:
# Definición de modelo autorregresivos con búsqueda de parámetros realizada por grid search devolviendo la predicción
def prediccion_sarimax(datos,datos_train,columna,size):
    
    # Grid search
    forecaster = ForecasterSarimax(
                    regressor=Sarimax(
                                    order=(1, 1, 1), # Placeholder replaced in the grid search
                                    maxiter=500
                                )
                )

    param_grid = {
        'order': [(0, 1, 0), (0, 1, 1), (1, 1, 0), (1, 1, 1), (2, 1, 1), (1 ,1 ,2), ( 2, 1, 2),(0, 0, 0), (0, 0, 1), (1, 0, 0), (1, 0, 1), (2, 0, 1), (1 ,0 ,2), ( 2, 0, 2) ],
        'seasonal_order': [(0, 0, 0, 0), (0, 1, 0, 12), (1, 1, 1, 12)],
        'trend': [None]
    }

    resultados_grid = grid_search_sarimax(
                            forecaster            = forecaster,
                            y                     = datos[columna],
                            param_grid            = param_grid,
                            steps                 = 12,
                            refit                 = True,
                            metric                = 'mean_absolute_error',
                            initial_train_size    = int(len(datos_train)*0.8),
                            fixed_train_size      = False,
                            return_best           = False,
                            n_jobs                = 'auto',
                            suppress_warnings_fit = True,
                            verbose               = False,
                            show_progress         = True
                    )
    
    r=resultados_grid.index[0]

    # Predicciones de backtesting con el mejor modelo según el grid search
    # ==============================================================================
    forecaster_1 = ForecasterSarimax( regressor=Sarimax(order=resultados_grid.order[r], seasonal_order=resultados_grid.seasonal_order[r], maxiter=500),
                    )

    metrica_m1, predicciones_m1 = backtesting_sarimax(
                                            forecaster            = forecaster_1,
                                            y                     = datos[columna],
                                            initial_train_size    = int(len(datos_train)*0.8),
                                            steps                 = size+12,
                                            metric                = 'mean_absolute_error',
                                            refit                 = True,
                                            n_jobs                = "auto",
                                            suppress_warnings_fit = True,
                                            verbose               = False,
                                            show_progress         = True
                                        )

    
    return predicciones_m1

In [None]:
# Entrenamiento del modelo forecaster autorregresivo
def prediccion_backtesting_forecasterAutoreg(datos_train,column,size,steps,param_grid,lags_grid,forecaster):

    resultados_grid = grid_search_forecaster(
                        forecaster         = forecaster,
                        y                  = datos_train[column],
                        param_grid         = param_grid,
                        lags_grid          = lags_grid,
                        steps              = steps,
                        refit              = False,
                        metric             = 'mean_squared_error',
                        initial_train_size = int(len(datos_train)*0.8),
                        fixed_train_size   = False,
                        return_best        = True,
                        n_jobs             = 'auto',
                        verbose            = False
                    )

    # Predicciones
    # ==============================================================================
    predicciones = forecaster.predict(steps=size)

    return predicciones

In [None]:
# Entrenamiento del modelo forecaster autorregresivo directo con regresor lineal con penalización Ridge devolviendo las predicciones
def predicciones_backtesting_forecasterAutoregDirect(datos_train,column,steps,param_grid,lags_grid,forecaster):

    resultados_grid = grid_search_forecaster(
                        forecaster         = forecaster,
                        y                  = datos_train[column],
                        param_grid         = param_grid,
                        lags_grid          = lags_grid,
                        steps              = steps,
                        refit              = False,
                        metric             = 'mean_squared_error',
                        initial_train_size = int(len(datos_train)*0.8),
                        fixed_train_size   = False,
                        return_best        = True,
                        n_jobs             = 'auto',
                        verbose            = False
                    )

    # Predicciones
    # ==============================================================================
    predicciones = forecaster.predict()
 
    return predicciones


In [None]:
# Definimos el modelo de predicción prophet cuyos parámetros son unos datos de entrenamiento y otros de test y devolvemos las predicciones
def pred_prophet_prediccion(data_train,column,size,frequ):
    
    data_train=data_train.reset_index()
    data_train.rename(columns={data_train.columns[0] : 'ds', column: 'y'}, inplace=True)
    model = Prophet()
    model.fit(data_train)
    
    future = model.make_future_dataframe(periods=size,freq=frequ)
    forecast=model.predict(future)
    
    y_pred=forecast['yhat'][len(data_train):].values
    
    return y_pred