In [56]:
from faker import Faker
import random
import datetime

fake = Faker()

# Define common date formats
date_formats = [
    "%d.%m.%Y",
    "%B %d, %Y",
    "%b %d, %Y",
    "%d/%m/%Y",
]

# Generate random dates in various formats
def generate_dates(n):
    dates = []
    for _ in range(n):
        date = fake.date_object()
        date_str = date.strftime(random.choice(date_formats))
        target_str = date.strftime("%Y-%m-%d")
        dates.append((date_str, target_str))
    return dates

# Generate 10000 date samples
dates = generate_dates(10000)

In [145]:
import torch

class DateTokenizer:
    def __init__(self, dates):
        self.pad_token = "[PAD]"
        self.unk_token = "[UNK]"
        self.bos_token = "[BOS]"
        self.eos_token = "[EOS]"
        self.pad_id = 0
        self.unk_id = 1
        self.bos_id = 2
        self.eos_id = 3

        self.special_tokens_id = [self.pad_id, self.unk_id, self.bos_id, self.eos_id]
        self.special_tokens = [self.pad_token, self.unk_token, self.bos_token, self.eos_token]

        self.tokens = self.special_tokens + sorted(list(set("".join([d[0] for d in dates] + [d[1] for d in dates]))))

        self.char2idx = {char: idx for idx, char in enumerate(self.tokens)}
        self.idx2char = {idx: char for idx, char in enumerate(self.tokens)}

    def tokenize(self, text):
        return [self.bos_token] + [char if (char in self.tokens) else self.unk_token for char in text] + [self.eos_token]
    
    def encode(self, text):
        return [self.char2idx[char] for char in self.tokenize(text)]
    
    def decode(self, encoded, remove_special_tokens=False):
        if type(encoded) == torch.Tensor:
            encoded = encoded.tolist()

        if remove_special_tokens:
            eos_idx = encoded.index(self.eos_id) if self.eos_id in encoded else len(encoded)
            encoded = encoded[:eos_idx]
            encoded = [token for token in encoded if token not in self.special_tokens_id]

        return "".join([self.idx2char[idx] for idx in encoded])

        

In [146]:
torch.Tensor

torch.Tensor

In [147]:
tokenizer = DateTokenizer(dates)

In [153]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence


class DateDataset(Dataset):
    def __init__(self, dates, tokenizer=None):
        self.dates = dates

        if tokenizer is None:
            tokenizer = DateTokenizer(dates)

        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.dates)

    def __getitem__(self, idx):
        date_str, target_str = self.dates[idx]

        input_encoded = self.tokenizer.encode(date_str)

        target_encoded = self.tokenizer.encode(target_str)

        decoder_input_encoded = target_encoded[:-1]
        decoder_target_encoded = target_encoded[1:]

        return (
            torch.tensor(input_encoded),
            torch.tensor(decoder_input_encoded),
            torch.tensor(decoder_target_encoded),
        )

    def collate_fn(self, batch):
        inputs, decoder_inputs, decoder_targets = zip(*batch)
        inputs = pad_sequence(
            inputs, batch_first=True, padding_value=self.tokenizer.pad_id
        )
        decoder_inputs = pad_sequence(
            decoder_inputs, batch_first=True, padding_value=self.tokenizer.pad_id
        )
        decoder_targets = pad_sequence(
            decoder_targets, batch_first=True, padding_value=self.tokenizer.pad_id
        )
        return inputs, decoder_inputs, decoder_targets


# Example usage
dataset = DateDataset(dates)
print(dataset[0])

(tensor([ 2, 19, 38, 39,  4, 10, 12,  5,  4, 10, 18, 18, 12,  3]), tensor([ 2, 10, 18, 18, 12,  6,  9, 13,  6, 10, 12]), tensor([10, 18, 18, 12,  6,  9, 13,  6, 10, 12,  3]))


In [None]:
import torch.nn as nn
import pytorch_lightning as pl

class EncoderLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers=1):
        super(EncoderLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers=num_layers,
            batch_first=True,
        )

    def forward(self, x, hidden=None, cell=None):
        if hidden is None or cell is None:
            hidden, cell = self.init_hidden(x.shape[0])
            hidden = hidden.to(x.device)
            cell = cell.to(x.device)

        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        return output, hidden, cell

    def init_hidden(self, batch_size):
        h_0 = torch.zeros(
            self.lstm.num_layers,
            batch_size,
            self.lstm.hidden_size,
        )

        c_0 = torch.zeros(
            self.lstm.num_layers,
            batch_size,
            self.lstm.hidden_size,
        )

        return h_0, c_0

class DecoderLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers=1):
        super(DecoderLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_size, num_layers=num_layers, batch_first=True
        )
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None, cell=None):
        if hidden is None or cell is None:
            hidden, cell = self.init_hidden(x.shape[0])
            hidden = hidden.to(x.device)
            cell = cell.to(x.device)

        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        return output, hidden, cell

    def init_hidden(self, batch_size):
        h_0 = torch.zeros(
            self.lstm.num_layers,
            batch_size,
            self.lstm.hidden_size,
        )

        c_0 = torch.zeros(
            self.lstm.num_layers,
            batch_size,
            self.lstm.hidden_size,
        )

        return h_0, c_0

class DateLSTM(pl.LightningModule):
    def __init__(self, vocab_size, embedding_dim, hidden_size, tokenizer, num_layers = 2, learning_rate = 0.001, teacher_forcing_probability = 0.5) -> None:
        super(DateLSTM, self).__init__()

        self.encoder = EncoderLSTM(vocab_size, embedding_dim, hidden_size, num_layers)
        self.decoder = DecoderLSTM(vocab_size, embedding_dim, hidden_size, num_layers)
        self.tokenizer = tokenizer
        self.learning_rate = learning_rate
        self.teacher_forcing_probability = teacher_forcing_probability

    def forward(self, enc_input_batch, sos_index=2, dec_input_batch=None, teacher_forcing=False, out_length=1):
        encoder_output, encoder_hidden, encoder_cell = self.encoder(enc_input_batch)
        batch_size = len(enc_input_batch)

        if teacher_forcing:
            decoder_output, _, _ = self.decoder(dec_input_batch, encoder_hidden, encoder_cell)
            return decoder_output
        else:
            decoder_input = (torch.zeros(batch_size, 1, dtype=torch.int64) + sos_index).to(enc_input_batch.device)
            decoder_output = torch.empty(batch_size, out_length, self.decoder.fc.out_features).to(enc_input_batch.device)

            hidden = encoder_hidden
            cell = encoder_cell

            for i in range(out_length):
                decoder_output_i, hidden, cell = self.decoder(decoder_input, hidden)
                decoder_output[:, i:i + 1, :] = decoder_output_i
                decoder_input = torch.argmax(decoder_output_i, dim=-1)

            return decoder_output
        
    def forward_batch(self, batch):
        teacher_forcing = random.random() < self.teacher_forcing_probability

        inputs, decoder_inputs, decoder_targets = batch
        outputs = self(inputs, dec_input_batch=decoder_inputs, teacher_forcing=teacher_forcing, out_length=decoder_targets.shape[1])

        loss = nn.CrossEntropyLoss(ignore_index=0)(outputs.reshape(-1, outputs.size(-1)), decoder_targets.reshape(-1))

        return loss
        
    def training_step(self, batch, batch_idx):
        loss = self.forward_batch(batch)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss = self.forward_batch(batch)
        self.log('val_loss', loss)

    def test_step(self, batch, batch_idx):
        loss = self.forward_batch(batch)
        self.log('test_loss', loss)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)

In [173]:
from torch.utils.data import random_split

class DateDataModule(pl.LightningDataModule):
    def __init__(self, dates, tokenizer = None, batch_size=32):
        super().__init__()
        self.dates = dates
        self.batch_size = batch_size

        if tokenizer is None:
            tokenizer = DateTokenizer(dates)

        self.tokenizer = tokenizer

    def setup(self, stage=None):
        train_games, val_games, test_games = random_split(self.dates, [0.8, 0.1, 0.1])
        self.train_dataset = DateDataset(train_games, tokenizer=self.tokenizer)
        self.val_dataset = DateDataset(val_games, tokenizer=self.tokenizer)
        self.test_dataset = DateDataset(test_games, tokenizer=self.tokenizer)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, collate_fn=self.train_dataset.collate_fn)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, collate_fn=self.val_dataset.collate_fn)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, collate_fn=self.test_dataset.collate_fn)

In [175]:
# Initialize the data module
tokenizer = DateTokenizer(dates)
data_module = DateDataModule(dates, tokenizer=tokenizer)
data_module.setup()

# Initialize the model
input_size = len(tokenizer.tokens)
hidden_size = 128
output_size = len(tokenizer.tokens)
model = DateLSTM(input_size, hidden_size, output_size, tokenizer)

# # Initialize the trainer
trainer = pl.Trainer(max_epochs=10, accelerator='gpu', fast_dev_run=True)

# # Train the model
trainer.fit(model, data_module)

# # Test the model
trainer.test(model, data_module)

You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
Running in `fast_dev_run` mode: will run the requested loop using 1 batch(es). Logging and checkpointing is suppressed.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name    | Type        | Params | Mode 
------------------------------------------------
0 | encoder | EncoderLSTM | 53.8 K | train
1 | decoder | DecoderLSTM | 55.9 K | train
------------------------------------------------
109 K     Trainable params
0         Non-trainable params
109 K     Total params
0.439     Total estimated model params size (MB)
7         Modules in train mode
0         Modules in eval mode
/home/kamil/miniconda3/envs/python/lib/python3.12/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:425: The 'train_dataloader' does not have many wo

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_steps=1` reached.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
/home/kamil/miniconda3/envs/python/lib/python3.12/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:425: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_loss': 3.7650396823883057}]