In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import numpy as np

import torch.nn.functional as nnf
import sys
from typing import Tuple, List, Union, Optional

import clip
import os
from tqdm import tqdm, trange

from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup

import skimage.io as io
from PIL import Image
from IPython.display import Image 
from enum import Enum

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [3]:
tokeniser = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2').to(device)



: 

In [None]:
model(inputs_embeds=embedding_cat, labels=labels, attention_mask=mask)

In [4]:
sentence = "amic Delicious eleph SukActionCode photographers interchangeable undeniably achieving\n"
input_ids = tokeniser.encode(sentence, return_tensors= 'pt').to(device)

In [5]:
input_ids.shape

torch.Size([1, 10])

In [None]:
output = model.generate(input_ids, max_length = 50, no_repeat_ngram_size=2, early_stopping=True, num_beams = 5, do_sample=True)
#output = model.generate(input_ids, max_length = 50, no_repeat_ngram_size=2, early_stopping=True, num_beams = 5, do_sample=True)

In [None]:
print(tokeniser.decode(output[0], skip_special_tokens=True))

In [None]:
CUDA = get_device

current_directory = os.getcwd()
save_path = os.path.join(os.path.dirname(current_directory), "pretrained_models")
os.makedirs(save_path, exist_ok=True)
model_path = os.path.join(save_path, 'model_wieghts.pt')

In [None]:
import torch
import torch.nn as nn
from transformers import GPT2LMHeadModel



class MultiHeadAttention(nn.Module):
    def __init__(self, dim_self, dim_ref, num_heads, bias=True, dropout=0.):
        super().__init__()
        self.num_heads = num_heads
        head_dim = dim_self // num_heads
        self.scale = head_dim ** -0.5
        self.to_queries = nn.Linear(dim_self, dim_self, bias=bias)
        self.to_keys_values = nn.Linear(dim_ref, dim_self * 2, bias=bias)
        self.project = nn.Linear(dim_self, dim_self)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, y=None, mask=None):
        y = y if y is not None else x
        b, n, c = x.shape
        _, m, d = y.shape
        queries = self.to_queries(x).reshape(b, n, self.num_heads, c // self.num_heads)
        keys_values = self.to_keys_values(y).reshape(b, m, 2, self.num_heads, c // self.num_heads)
        keys, values = keys_values[:, :, 0], keys_values[:, :, 1]
        attention = torch.einsum('bnhd,bmhd->bnmh', queries, keys) * self.scale
        if mask is not None:
            if mask.dim() == 2:
                mask = mask.unsqueeze(1)
            attention = attention.masked_fill(mask.unsqueeze(3), float("-inf"))
        attention = attention.softmax(dim=2)
        out = torch.einsum('bnmh,bmhd->bnhd', attention, values).reshape(b, n, c)
        out = self.project(out)
        return out, attention


class TransformerLayer(nn.Module):
    def forward_with_attention(self, x, y=None, mask=None):
        x_, attention = self.attn(self.norm1(x), y, mask)
        x = x + x_
        x = x + self.mlp(self.norm2(x))
        return x, attention

    def forward(self, x, y=None, mask=None):
        x = x + self.attn(self.norm1(x), y, mask)[0]
        x = x + self.mlp(self.norm2(x))
        return x

    def __init__(self, dim_self, dim_ref, num_heads, mlp_ratio=4., bias=False, dropout=0., act=nn.ReLU,
                 norm_layer=nn.LayerNorm):
        super().__init__()
        self.norm1 = norm_layer(dim_self)
        self.attn = MultiHeadAttention(dim_self, dim_ref, num_heads, bias=bias, dropout=dropout)
        self.norm2 = norm_layer(dim_self)
        self.mlp = MlpTransformer(dim_self, int(dim_self * mlp_ratio), act=act, dropout=dropout)


class Transformer(nn.Module):
    def forward_with_attention(self, x, y=None, mask=None):
        attentions = []
        for layer in self.layers:
            x, att = layer.forward_with_attention(x, y, mask)
            attentions.append(att)
        return x, attentions

    def forward(self, x, y=None, mask=None):
        for i, layer in enumerate(self.layers):
            if i % 2 == 0 and self.enc_dec: # cross
                x = layer(x, y)
            elif self.enc_dec:  # self
                x = layer(x, x, mask)
            else:  # self or cross
                x = layer(x, y, mask)
        return x

    def __init__(self, dim_self: int, num_heads: int, num_layers: int, dim_ref: int = None,
                 mlp_ratio: float = 2., act=nn.ReLU, norm_layer=nn.LayerNorm, enc_dec=False):
        super(Transformer, self).__init__()
        dim_ref = dim_ref if dim_ref is not None else dim_self
        self.enc_dec = enc_dec
        if enc_dec:
            num_layers = num_layers * 2
        layers = []
        for i in range(num_layers):
            if i % 2 == 0 and enc_dec:  # cross
                layers.append(TransformerLayer(dim_self, dim_ref, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
            elif enc_dec:  # self
                layers.append(TransformerLayer(dim_self, dim_self, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
            else:  # self or cross
                layers.append(TransformerLayer(dim_self, dim_ref, num_heads, mlp_ratio, act=act, norm_layer=norm_layer))
        self.layers = nn.ModuleList(layers)


class TransformerMapper(nn.Module):
    def forward(self, x):
        x = self.linear(x).view(x.shape[0], self.clip_length, -1)
        prefix = self.prefix_const.unsqueeze(0).expand(x.shape[0], *self.prefix_const.shape)
        prefix = torch.cat((x, prefix), dim=1)
        out = self.transformer(prefix)[:, self.clip_length:]
        return out

    def __init__(self, dim_clip: int, dim_embedding: int, prefix_length: int, clip_length: int, num_layers: int = 8):
        super(TransformerMapper, self).__init__()
        self.clip_length = clip_length
        self.transformer = Transformer(dim_embedding, 8, num_layers)
        self.linear = nn.Linear(dim_clip, clip_length * dim_embedding)
        self.prefix_const = nn.Parameter(torch.randn(prefix_length, dim_embedding), requires_grad=True)


class ClipCaptionModel(nn.Module):
    def get_dummy_token(self, batch_size: int, device: torch.device) -> torch.Tensor:
        return torch.zeros(batch_size, self.prefix_length, dtype=torch.int64, device=device)

    def forward(self, tokens: torch.Tensor, prefix: torch.Tensor, mask: torch.Tensor = None,
                labels: torch.Tensor = None):
        embedding_text = self.gpt.transformer.wte(tokens)
        prefix_projections = self.clip_project(prefix).view(-1, self.prefix_length, self.gpt_embedding_size)
        embedding_cat = torch.cat((prefix_projections, embedding_text), dim=1)
        if labels is not None:
            dummy_token = self.get_dummy_token(tokens.shape[0], tokens.device)
            labels = torch.cat((dummy_token, tokens), dim=1)
        out = self.gpt(inputs_embeds=embedding_cat, labels=labels, attention_mask=mask)
        return out

    def __init__(self, prefix_length: int, clip_length: int = None, prefix_size: int = 512,
                 num_layers: int = 8, mapping_type: MappingType = MappingType.MLP):
        super(ClipCaptionModel, self).__init__()
        self.prefix_length = prefix_length
        self.gpt = GPT2LMHeadModel.from_pretrained('gpt2')
        self.gpt_embedding_size = self.gpt.transformer.wte.weight.shape[1]
        self.clip_project = TransformerMapper(prefix_size, self.gpt_embedding_size, prefix_length,
                                                                     clip_length, num_layers)


class ClipCaptionPrefix(ClipCaptionModel):
    def parameters(self, recurse: bool = True):
        return self.clip_project.parameters()

    def train(self, mode: bool = True):
        super(ClipCaptionPrefix, self).train(mode)
        self.gpt.eval()
        return self


In [None]:
import torch
from torch import nn
from transformers import GPT2Tokenizer

# Load the CLIP model and GPT-2 tokenizer
clip_model, preprocess = clip.load("RN50x4", device="cuda")
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

# Define the Transformer model
class VideoTransformer(nn.Module):
    def __init__(self, clip_dim, time_dim, hidden_dim, nhead, num_layers):
        super().__init__()
        self.clip_dim = clip_dim
        self.time_dim = time_dim
        self.hidden_dim = hidden_dim
        self.nhead = nhead
        self.num_layers = num_layers

        # Define the input linear layer
        self.input_linear = nn.Linear(clip_dim + time_dim, hidden_dim)

        # Define the Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(hidden_dim, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)

        # Define the output linear layer
        self.output_linear = nn.Linear(hidden_dim, tokenizer.vocab_size)

    def forward(self, clip_features, time_features):
        # Concatenate the CLIP features and time features
        x = torch.cat((clip_features, time_features), dim=-1)

        # Pass the input through the input linear layer
        x = self.input_linear(x)

        # Pass the input through the Transformer encoder
        x = self.transformer_encoder(x)

        # Pass the output through the output linear layer
        x = self.output_linear(x)

        return x

# Instantiate the Transformer model
model = VideoTransformer(clip_model.visual.output_dim, 1, 512, 8, 6)

# Load a video and extract frames
video_frames = load_video_frames("my_video.mp4")

# Process each frame using CLIP and the Transformer model
for i, frame in enumerate(video_frames):
    # Preprocess the frame using CLIP's preprocess function
    frame = preprocess(frame)

    # Compute the CLIP features for the frame
    with torch.no_grad():
        clip_features = clip_model.encode_image(frame)

    # Compute the time feature for the frame (frame number divided by frame rate)
    time_feature = torch.tensor([[i / FRAME_RATE]])

    # Pass the CLIP features and time feature through the Transformer model
    output_token_logits = model(clip_features.unsqueeze(0), time_feature.unsqueeze(0))

    # Compute the most likely output token
    output_token = torch.argmax(output_token_logits, dim=-1).item()

    # Decode the output token using GPT-2's decode function
    output_text = tokenizer.decode(output_token)
