In [1]:
!pip install einops xformers np

Collecting einops
  Using cached einops-0.8.1-py3-none-any.whl.metadata (13 kB)
Collecting xformers
  Downloading xformers-0.0.32.post2-cp39-abi3-win_amd64.whl.metadata (1.1 kB)
Collecting np
  Downloading np-1.0.2.tar.gz (7.4 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Using cached einops-0.8.1-py3-none-any.whl (64 kB)
Downloading xformers-0.0.32.post2-cp39-abi3-win_amd64.whl (100.2 MB)
   ---------------------------------------- 0.0/100.2 MB ? eta -:--:--
   ---------------------------------------- 0.8/100.2 MB 6.7 MB/s eta 0:00:15
   - -------------------------------------- 2.6/100.2 MB 7.5 MB/s eta 0:00:14
   - -------------------------------------- 3.7/100.2 MB 6.6 MB/s eta 0:00:15
   -- --------


[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules import ModuleList
from torch.nn.modules.normalization import LayerNorm
from torch import nn

import copy
import math

AttributeError: `np.float_` was removed in the NumPy 2.0 release. Use `np.float64` instead.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
device

device(type='cuda')

In [None]:
def _get_clones(module, n):
    return ModuleList([copy.deepcopy(module) for i in range(n)])

In [None]:
class Conv1D(nn.Module):
    def __init__(self, nx, nf):
        '''
        nx: Numero de datos de entrada.
        nf: Numero de filtros. (Canales de salida).
        '''
        super().__init__()
        self.nf = nf
        #Inicializando una matriz vacia de pesos del tamaño (nx)X(nf)
        w = torch.empty(nx, nf)
        #Inicializando los pesos con una distribución normal.
        nn.init.normal_(w, std=0.02)
        #Calculando los pesos y sesgos encodeandos usando nn.Parameter
        self.weight = nn.Parameter(w)
        self.bias = nn.Parameter(torch.zeros(nf))

    def forward(self, x):
        '''x:Tensor de entrada.'''
        #El tamaño de la salida es la suna de la segunda dimensión de X y el número de filtros nf.
        size_out = x.size()[:-1] + (self.nf,)
        # Producot punto Q,K(Transpuesta) y V
        x = torch.addmm(self.bias, x.view(-1, x.size(-1)), self.weight)# x.view ayuda a calcular la transpuesta.
        x = x.view(*size_out)
        return x

In [None]:
class FeedForward(nn.Module):
    def __init__(self, dropout, d_model=768, nx=768*4):
        super().__init__()
        self.c_fc    = Conv1D(d_model, nx)
        self.c_proj  = Conv1D(nx, d_model)
        self.act     = F.gelu
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.dropout(self.c_proj(self.act(self.c_fc(x))))

In [None]:
class Attention(nn.Module):
    def __init__(self, d_model=768, n_head=12, n_ctx=1024, d_head=64, bias=True, scale=False):
        '''Función de construcción
        Params:
        d_model:Dimensión que necesita ser ingresada en el modelo.
        n_head:La cantidad de heads de atención.
        n_ctx:Buffer para guardar los registros del sesgo.
        d_head:Dimesión de salida para el head.
        bias:Un booleano para saber si incluir el sesgo.
        scale: Escalar y estabilidad númerica (sqrt(dk))
        '''
        super().__init__()
        self.n_head  = n_head
        self.d_model = d_model
        self.c_attn  = Conv1D(d_model, d_model*3)
        self.scale   = scale
        self.softmax = nn.Softmax(dim=-1)
        self.register_buffer("bias", torch.tril(torch.ones(n_ctx, n_ctx)).view(1, 1, n_ctx, n_ctx))
        self.dropout = nn.Dropout(0.1)
        self.c_proj  = Conv1D(d_model, d_model)

    def split_heads(self, x):
        """
        Diviendo en la cantidad de heads y retornando.
        return shape [`batch`, `head`, `sequence`, `features`]
        """
        new_shape = x.size()[:-1] + (self.n_head, x.size(-1)//self.n_head)
        x = x.view(*new_shape)
        return x.permute(0, 2, 1, 3)

    def _attn(self, q, k, v, attn_mask=None):
        """Función de antención principal.
        Que calcula usando la formula de producto punto de atención."""
        scores  = torch.matmul(q, k.transpose(-2, -1))# producto punto de Q*K(t)
        if self.scale: scores = scores/math.sqrt(v.size(-1))# escalandola por sqrt(dk)
        nd, ns  = scores.size(-2), scores.size(-1)
        if attn_mask is not None: scores = scores + attn_mask# agregando los valores con la mascara de atención.
        scores  = self.softmax(scores)# añadiendo los valores de softmax
        scores  = self.dropout(scores) # función de dropout 0.1
        outputs = torch.matmul(scores, v) # Multiplicación final del puntaje por V.
        return outputs

    def merge_heads(self, x):
        # Combinando todas las heads en una sola.
        x = x.permute(0, 2, 1, 3).contiguous()
        new_shape = x.size()[:-2] + (x.size(-2)*x.size(-1),)
        return x.view(*new_shape)

    def forward(self, x):
        '''Función de para calcular atención, separar las heads y combinarlas de nuevo.'''
        x        = self.c_attn(x) #new `x` shape - `[1,3,2304]`
        q, k, v  = x.split(self.d_model, dim=2)
        q, k, v  = self.split_heads(q), self.split_heads(k), self.split_heads(v)
        out      = self._attn(q, k, v)
        out      = self.merge_heads(out)
        out      = self.c_proj(out)
        return out

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model=768, n_head=12, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attn        = Attention(d_model=768, n_head=12, d_head=64, n_ctx=1024, bias=True, scale=False)
        self.feedforward = FeedForward(dropout=0.1, d_model=768, nx=768*4)
        self.ln_1        = LayerNorm(d_model)
        self.ln_2        = LayerNorm(d_model)

    def forward(self, x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.feedforward(self.ln_2(x))
        return x

In [None]:
class GPT2(nn.Module):
    def __init__(self, nlayers=12, n_ctx=1024, d_model=768, vcb_sz=50257):
        '''nlayer: La cantidad de veces que queremos multiplicar el Transformer.
        n_ctx: El contexto, la cantidad total de tokens que puede ver en el pasado de las palabras.
        d_model:Dimesionos del modelo.
        vcb_sz:El tamaño del vocabulario usado en el entrenamiento.'''
        super(GPT2, self).__init__()
        self.nlayers = nlayers
        block        = TransformerBlock(d_model=768, n_head=12, dropout=0.1)
        self.h       = _get_clones(block, 12)
        self.wte     = nn.Embedding(vcb_sz, d_model)
        self.wpe     = nn.Embedding(n_ctx, d_model)
        self.drop    = nn.Dropout(0.1)
        self.ln_f    = LayerNorm(d_model)
        self.out     = nn.Linear(d_model, vcb_sz, bias=False)
        self.loss_fn = nn.CrossEntropyLoss()
        self.init_weights()

    def init_weights(self):
        '''Inicialización de los pesos.'''
        self.out.weight = self.wte.weight
        self.apply(self._init_weights)

    def _init_weights(self, module):
        '''Inicialización con la media y S.D.'''
        if isinstance(module, (nn.Linear, nn.Embedding, Conv1D)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, (nn.Linear, Conv1D)) and module.bias is not None:
                '''Data Bias zero'''
                module.bias.data.zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, src, labels=None, pos_ids=None):
        '''Añadir el embedding posicional, dropping y añadiendo los inputs
           usados por la función de perdida y finalmente añadiendo la salida y la
           perdida.'''
        if pos_ids is None:
            pos_ids = torch.arange(0, src.size(-1)).unsqueeze(0)
        pos_ids = pos_ids.to(src.device)  # Asegurarse que los pos_ids están en el mismo device.
        inp = self.drop((self.wte(src) + self.wpe(pos_ids)))
        for i in range(self.nlayers): inp = self.h[i](inp)
        inp     = self.ln_f(inp)
        logits  = self.out(inp)
        outputs = (logits,) + (inp,)

        if labels is not None:
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            loss = self.loss_fn(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
            outputs = (loss,) + outputs
            return loss.mean()
        return logits

In [None]:
import torch.nn.functional as F
import time
from transformers import GPT2Tokenizer

In [None]:
model = GPT2()

In [None]:
!curl --output gpt2-pytorch_model.bin https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-pytorch_model.bin

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  522M  100  522M    0     0  7761k      0  0:01:08  0:01:08 --:--:-- 13.4M


In [None]:
model_dict = model.state_dict()
state_dict = torch.load("./gpt2-pytorch_model.bin")

old_keys = []
new_keys = []
for key in state_dict.keys():
    if "mlp" in key: #El diccionario de estado para el MLP feedforward debe ser cambiado por mlp
        new_key = key.replace("mlp", "feedforward")
        new_keys.append(new_key)
        old_keys.append(key)

  state_dict = torch.load("./gpt2-pytorch_model.bin")


In [None]:
for old_key, new_key in zip(old_keys, new_keys):
    state_dict[new_key]=state_dict.pop(old_key)

In [None]:
pretrained_dict = {k: v for k, v in state_dict.items() if k in model_dict}

model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)
model.eval()

GPT2(
  (h): ModuleList(
    (0-11): 12 x TransformerBlock(
      (attn): Attention(
        (c_attn): Conv1D()
        (softmax): Softmax(dim=-1)
        (dropout): Dropout(p=0.1, inplace=False)
        (c_proj): Conv1D()
      )
      (feedforward): FeedForward(
        (c_fc): Conv1D()
        (c_proj): Conv1D()
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    )
  )
  (wte): Embedding(50257, 768)
  (wpe): Embedding(1024, 768)
  (drop): Dropout(p=0.1, inplace=False)
  (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (out): Linear(in_features=768, out_features=50257, bias=False)
  (loss_fn): CrossEntropyLoss()
)

In [None]:
total_params = sum(p.numel() for p in model.parameters())

In [None]:
size_bytes = total_params * 4
size_mb = size_bytes / (1024 ** 2)

print(f"El tamaño total de GPT2 sin alteraciones es: {size_bytes} bytes o {size_mb:.2f} MB")

El tamaño total de GPT2 sin alteraciones es: 497759232 bytes o 474.70 MB


In [None]:
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
context = torch.tensor([tokenizer.encode("The planet earth is a beautiful")])

In [None]:
def generate(context, ntok=550):
    start_time = time.time()
    for _ in range(ntok):
        out = model(context)
        logits = out[:, -1, :]
        indices_to_remove = logits < torch.topk(logits, 10)[0][..., -1, None]
        logits[indices_to_remove] = -np.inf
        next_tok = torch.multinomial(F.softmax(logits, dim=-1), num_samples=1).squeeze(1)
        context = torch.cat([context, next_tok.unsqueeze(-1)], dim=-1)
    end_time = time.time()
    inference_time = end_time - start_time
    return context, inference_time

In [None]:
out, inference_time = generate(context, ntok=40)
decoded_output = tokenizer.decode(out[0])

In [None]:
print(f"Inference Time: {inference_time:.4f} seconds")
print(f"Generated Output: {decoded_output}")

Inference Time: 7.6916 seconds
Generated Output: The planet earth is a beautiful place that.







( ) a: a) " is a planet a " a. a planet, a a. a. place the

: a:
