In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import pytorch_lightning as pl


In [46]:
import pandas as pd
import numpy as np

# Créer des données artificielles pour la prévision de séries temporelles
np.random.seed(42)

n_timesteps = 1000  # Nombre total de timesteps
date_range = pd.date_range(start="2020-01-01", periods=n_timesteps, freq="D")
data = pd.DataFrame({
    'date': date_range,
    'value': np.sin(np.linspace(0, 100, n_timesteps)) + np.random.normal(0, 0.1, n_timesteps)
})

data.head()

Unnamed: 0,date,value
0,2020-01-01,0.049671
1,2020-01-02,0.086107
2,2020-01-03,0.263634
3,2020-01-04,0.44811
4,2020-01-05,0.366372


In [52]:
def create_dataset(data, max_encoder_length, max_prediction_length, test_size=0.1, validation_size=0.1):
    """
    Crée un dataset avec des fenêtres glissantes pour backcast et forecast.
    
    Parameters:
    - data: DataFrame avec les données.
    - max_encoder_length: Nombre de timesteps à utiliser pour le backcast (entrée).
    - max_prediction_length: Nombre de timesteps à prédire (forecast).
    - test_size: Proportion des données à utiliser pour l'ensemble de test.
    - validation_size: Proportion des données à utiliser pour l'ensemble de validation.
    
    Returns:
    - train_data, val_data, test_data: Listes de tuples (X, y) pour l'entraînement, la validation et le test.
    """
    # Calcul des indices de coupure pour les ensembles de test et de validation
    total_size = len(data)
    test_cutoff = int(total_size * (1 - test_size))
    val_cutoff = int(test_cutoff * (1 - validation_size))

    # Initialisation des ensembles
    train_data, val_data, test_data = [], [], []

    # Créer des fenêtres glissantes pour chaque ensemble
    for i in range(max_encoder_length, total_size - max_prediction_length):
        # X : Fenêtre d'entrée (backcast)
        X = data['value'][i - max_encoder_length:i].values  # Données historiques
        # y : Fenêtre de sortie (forecast)
        y = data['value'][i:i + max_prediction_length].values  # Valeurs futures à prédire
        
        # Séparer les données en train, validation et test
        if i < val_cutoff:
            train_data.append((X, y))
        elif i < test_cutoff:
            val_data.append((X, y))
        else:
            test_data.append((X, y))
    
    return train_data, val_data, test_data

# Paramètres du modèle
max_encoder_length = 60  # Nombre de timesteps pour backcast
max_prediction_length = 20  # Nombre de timesteps à prédire (forecast)

# Créer le dataset
train_data, val_data, test_data = create_dataset(data, max_encoder_length, max_prediction_length)

# Exemple d'affichage d'une entrée du dataset
print("Exemple d'entrée (train_data[0]):")
print("X (backcast):", train_data[0][0])
print("y (forecast):", train_data[0][1])
print("X (backcast2):", train_data[1][0])
print("y (forecast2):", train_data[1][1])




Exemple d'entrée (train_data[0]):
X (backcast): [ 0.04967142  0.08610659  0.26363439  0.44811007  0.36637177  0.45645101
  0.72305935  0.72149693  0.67096635  0.83814261  0.79566964  0.8451333
  0.95666991  0.77257744  0.81319517  0.94137132  0.89824244  1.02286885
  0.88263427  0.80445314  1.05502736  0.83956859  0.81395127  0.60169445
  0.61925144  0.60755767  0.39817013  0.46250469  0.27228211  0.20726035
  0.07797576  0.22370787 -0.0629213  -0.26677787 -0.17657553 -0.47614631
 -0.42486277 -0.72894062 -0.74768075 -0.67090876 -0.68556696 -0.80349252
 -0.88519405 -0.94799298 -1.10079866 -1.05045415 -1.04026076 -0.89425825
 -0.96137087 -1.15782999 -0.92508414 -0.96238124 -0.94869617 -0.76814699
 -0.66622249 -0.60850006 -0.71083098 -0.57683522 -0.42632688 -0.27083806]
y (forecast): [-0.32156113 -0.1947211  -0.18753659 -0.09650172  0.2041621   0.35709357
  0.31061132  0.51132656  0.53618328  0.51954808  0.69839259  0.88761939
  0.79444912  1.01072246  0.6399574   1.02276614  0.97850784  

In [55]:
import torch
from torch.utils.data import Dataset, DataLoader

# Définir un Dataset personnalisé pour les séries temporelles
class TimeseriesDataset(Dataset):
    def __init__(self, data, max_encoder_length, max_prediction_length):
        self.data = data
        self.max_encoder_length = max_encoder_length
        self.max_prediction_length = max_prediction_length
        
    def __len__(self):
        # Nombre d'exemples dans le dataset
        return len(self.data)
    
    def __getitem__(self, idx):
        # Récupérer une entrée et sa cible
        X, y = self.data[idx]
        X_tensor = torch.tensor(X, dtype=torch.float32) # Ajout d'une dimension pour les canaux
        y_tensor = torch.tensor(y, dtype=torch.float32)  # Cible (forecast)
        return X_tensor, y_tensor

# Créer les datasets pour chaque ensemble
train_dataset = TimeseriesDataset(train_data, max_encoder_length, max_prediction_length)
val_dataset = TimeseriesDataset(val_data, max_encoder_length, max_prediction_length)
test_dataset = TimeseriesDataset(test_data, max_encoder_length, max_prediction_length)

# Créer des DataLoaders pour chaque ensemble
batch_size = 64
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Exemple de ce à quoi ressemble un batch
i=0
for X_batch, y_batch in train_dataloader:
    print("X_batch shape:", X_batch.shape)  # Devrait être [batch_size, max_encoder_length, 1]
    print("y_batch shape:", y_batch.shape)  # Devrait être [batch_size, max_prediction_length, 1]
    print(X_batch[0][0:2])
    i=i+1
    if i ==2 : break

X_batch shape: torch.Size([64, 60])
y_batch shape: torch.Size([64, 20])
tensor([0.0497, 0.0861])
X_batch shape: torch.Size([64, 60])
y_batch shape: torch.Size([64, 20])
tensor([0.2042, 0.3571])


In [113]:
knots = torch.tensor([[1.0, 2.0, 3.0], 
                      [4.0, 5.0, 6.0]])
#knots = knots[:,None,:]
#knots.size()
knots = knots.unsqueeze(1)
#knots.size()
forecast=F.interpolate(knots, size=6, mode="linear").squeeze(1)
#knots.size()

print(forecast.size())




torch.Size([2, 6])


In [61]:
#y = torch.tensor([1.0,2,3 ,5,1,7,3,35,4],dtype=float)
y = torch.tensor([[1.0, 2.0, 3.0], 
                      [4.0, 5.0, 6.0]])
pooling_layer = nn.MaxPool1d(kernel_size=2, stride=2, ceil_mode=True)

pooling_layer(y).size()



torch.Size([2, 2])

In [32]:
# pool of size=3, stride=2
m = nn.MaxPool1d(3, stride=3)
input = torch.randn(20, 16, 50)
output = m(input)
output.size()

torch.Size([20, 16, 16])

In [None]:
def Block(nn.module):
    def __init__(self, input_size,horizon, hidden_sizes, kernel_size, expressiveness_ratio,num_layer):
        super(Block, self).__init__()

        self.L= input_size
        self.H= horizon
        self.r_l= expressiveness_ratio
        self.k_l= kernel_size
        self.num_layer=num_layer
        self.kernel_size = kernel_size
        self.expressiveness_ratio = expressiveness_ratio
        self.theta_size= self._rl * self.H
        
        # MaxPool layer for multi rate signal sampling
        self.maxpool = nn.MaxPool1d(kernel_size=self.k_l)
        
        self.layers = nn.ModuleList() # To handle a list of layers

        for hidden_size in hidden_sizes:
            self.layers.append(nn.Linear(hidden_size, hidden_size))
            self.layers.append(nn.ReLU())
        
        self.linear_f = nn.Linear(hidden_sizes[-1], self.theta_size)
        self.linear_b = nn.Linear(hidden_sizes[-1], self.theta_size)

    def forward(self, x):
        # Apply max pooling (multi-rate sampling)
        x_pooled = self.pool(x)   
        h = self.mlp(x_pooled)
        theta_f = self.linear_f(h).unsqueeze(1)
        theta_b = self.linear_b(h).unsqueeze(1)

        forecast= F.interpolate(theta_f, size= self.H, mode="linear")
        backcast= F.interpolate(theta_b, size=self.L, mode="linear")
        
        return forecast, backcast