In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
%cd /content/
!cp /content/drive/MyDrive/ttech/videos.tar.gz /content/
!tar -xf  /content/videos.tar.gz 

/content


In [3]:
# install requirements
%cd /content/
import sys
if 'google.colab' in sys.modules:
    print('Running in Colab.')
    !cp /content/drive/MyDrive/ttech/inf_video.mp4 .
    !pip3 install transformers==4.15.0 timm==0.4.12 fairscale==0.4.4 -q 
    # !pip install salesforce-lavis
    !git clone https://github.com/salesforce/BLIP
    %cd BLIP

/content
Running in Colab.
fatal: destination path 'BLIP' already exists and is not an empty directory.
/content/BLIP


In [4]:
import cv2
import matplotlib.pyplot as plt
from PIL import Image
import requests
import torch
import numpy as np
from torchvision import transforms
from torchvision.transforms.functional import InterpolationMode

In [5]:
#@title decoder + generator code + extractor
import warnings
warnings.filterwarnings("ignore")

from models.vit import VisionTransformer, interpolate_pos_embed
from models.med import BertConfig, BertModel, BertLMHeadModel
from transformers import BertTokenizer

import torch
from torch import nn
import torch.nn.functional as F

import os
from urllib.parse import urlparse
from timm.models.hub import download_cached_file


def init_tokenizer():
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    tokenizer.add_special_tokens({'bos_token':'[DEC]'})
    tokenizer.add_special_tokens({'additional_special_tokens':['[ENC]']})       
    tokenizer.enc_token_id = tokenizer.additional_special_tokens_ids[0]  
    return tokenizer




def create_vit(vit, image_size, use_grad_checkpointing=False, ckpt_layer=0, drop_path_rate=0):
        
    assert vit in ['base', 'large'], "vit parameter must be base or large"
    if vit=='base':
        vision_width = 768
        visual_encoder = VisionTransformer(img_size=image_size, patch_size=16, embed_dim=vision_width, depth=12, 
                                           num_heads=12, use_grad_checkpointing=use_grad_checkpointing, ckpt_layer=ckpt_layer,
                                           drop_path_rate=0 or drop_path_rate
                                          )   
    elif vit=='large':
        vision_width = 1024
        visual_encoder = VisionTransformer(img_size=image_size, patch_size=16, embed_dim=vision_width, depth=24, 
                                           num_heads=16, use_grad_checkpointing=use_grad_checkpointing, ckpt_layer=ckpt_layer,
                                           drop_path_rate=0.1 or drop_path_rate
                                          )   
    return visual_encoder, vision_width




def is_url(url_or_filename):
    parsed = urlparse(url_or_filename)
    return parsed.scheme in ("http", "https")




def load_checkpoint(model,url_or_filename):
    if is_url(url_or_filename):
        cached_file = download_cached_file(url_or_filename, check_hash=False, progress=True)
        checkpoint = torch.load(cached_file, map_location='cpu') 
    elif os.path.isfile(url_or_filename):        
        checkpoint = torch.load(url_or_filename, map_location='cpu') 
    else:
        raise RuntimeError('checkpoint url or path is invalid')
        
    state_dict = checkpoint['model']
    
    state_dict['visual_encoder.pos_embed'] = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder) 
    if 'visual_encoder_m.pos_embed' in model.state_dict().keys():
        state_dict['visual_encoder_m.pos_embed'] = interpolate_pos_embed(state_dict['visual_encoder_m.pos_embed'],
                                                                         model.visual_encoder_m)    
    for key in model.state_dict().keys():
        if key in state_dict.keys():
            if state_dict[key].shape!=model.state_dict()[key].shape:
                del state_dict[key]
    
    msg = model.load_state_dict(state_dict,strict=False)
    print('load checkpoint from %s'%url_or_filename)  
    return model,msg




class BLIP_Base(nn.Module):
    def __init__(self,                 
                 med_config = 'configs/med_config.json',  
                 image_size = 224,
                 vit = 'base',
                 vit_grad_ckpt = False,
                 vit_ckpt_layer = 0,                 
                 ):
        """
        Args:
            med_config (str): path for the mixture of encoder-decoder model's configuration file
            image_size (int): input image size
            vit (str): model size of vision transformer
        """               
        super().__init__()
        
        self.visual_encoder, vision_width = create_vit(vit,image_size, vit_grad_ckpt, vit_ckpt_layer)
        self.tokenizer = init_tokenizer()   
        med_config = BertConfig.from_json_file(med_config)
        med_config.encoder_width = vision_width
        self.text_encoder = BertModel(config=med_config, add_pooling_layer=False)  

        
    def forward(self, image, caption, mode):
        
        assert mode in ['image', 'text', 'multimodal'], "mode parameter must be image, text, or multimodal"
        text = self.tokenizer(caption, return_tensors="pt").to(device) 
        
        if mode=='image':    
            # return image features
            image_embeds = self.visual_encoder(image)             
            return image_embeds
        
        elif mode=='text':
            # return text features
            text_output = self.text_encoder(text.input_ids, attention_mask = text.attention_mask,                      
                                            return_dict = True, mode = 'text')  
            return text_output.last_hidden_state
        
        elif mode=='multimodal':
            # return multimodel features
            image_embeds = self.visual_encoder(image)    
            image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(device)      
            
            text.input_ids[:,0] = self.tokenizer.enc_token_id
            output = self.text_encoder(text.input_ids,
                                       attention_mask = text.attention_mask,
                                       encoder_hidden_states = image_embeds,
                                       encoder_attention_mask = image_atts,      
                                       return_dict = True,
                                      )              
            return output.last_hidden_state





def blip_feature_extractor(pretrained='',**kwargs):
    model = BLIP_Base(**kwargs)
    if pretrained:
        model,msg = load_checkpoint(model,pretrained)
        assert(len(msg.missing_keys)==0)
    return model 




class BLIP_Decoder(nn.Module):
    def __init__(self,                 
                 med_config = 'configs/med_config.json',  
                 image_size = 384,
                 vit = 'base',
                 vit_grad_ckpt = False,
                 vit_ckpt_layer = 0,
                 prompt = 'a picture of ',
                 ):
        """
        Args:
            med_config (str): path for the mixture of encoder-decoder model's configuration file
            image_size (int): input image size
            vit (str): model size of vision transformer
        """            
        super().__init__()
        
        self.visual_encoder, vision_width = create_vit(vit,image_size, vit_grad_ckpt, vit_ckpt_layer)
        self.tokenizer = init_tokenizer()   
        med_config = BertConfig.from_json_file(med_config)
        med_config.encoder_width = vision_width
        self.text_decoder = BertLMHeadModel(config=med_config)    
        
        self.prompt = prompt
        self.prompt_length = len(self.tokenizer(self.prompt).input_ids)-1

        
    def forward(self, image_embeds, caption):
        
        #image_embeds = self.visual_encoder(image) 
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(device)
        
        text = self.tokenizer(caption, padding='longest', truncation=True, max_length=40, return_tensors="pt").to(device) 
        
        text.input_ids[:,0] = self.tokenizer.bos_token_id
        
        decoder_targets = text.input_ids.masked_fill(text.input_ids == self.tokenizer.pad_token_id, -100)         
        decoder_targets[:,:self.prompt_length] = -100
     
        decoder_output = self.text_decoder(text.input_ids, 
                                           attention_mask = text.attention_mask, 
                                           encoder_hidden_states = image_embeds,
                                           encoder_attention_mask = image_atts,                  
                                           labels = decoder_targets,
                                           return_dict = True,   
                                          )   
        loss_lm = decoder_output.loss
        
        return loss_lm
        
    def generate(self, image_embeds, sample=False, num_beams=3, max_length=30, min_length=10, top_p=0.9, repetition_penalty=1.0):
        #image_embeds = self.visual_encoder(image)
        batch_size = image_embeds.shape[0]

        if not sample:
            image_embeds = image_embeds.repeat_interleave(num_beams,dim=0)
            
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(device)
        model_kwargs = {"encoder_hidden_states": image_embeds, "encoder_attention_mask":image_atts}
        
        prompt = [self.prompt] * batch_size
        input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.to(device) 
        input_ids[:,0] = self.tokenizer.bos_token_id
        input_ids = input_ids[:, :-1]

        if sample:
            #nucleus sampling
            outputs = self.text_decoder.generate(input_ids=input_ids,
                                                  max_length=max_length,
                                                  min_length=min_length,
                                                  do_sample=True,
                                                  top_p=top_p,
                                                  num_return_sequences=1,
                                                  eos_token_id=self.tokenizer.sep_token_id,
                                                  pad_token_id=self.tokenizer.pad_token_id, 
                                                  repetition_penalty=1.1,                                            
                                                  **model_kwargs)
        else:
            #beam search
            outputs = self.text_decoder.generate(input_ids=input_ids,
                                                  max_length=max_length,
                                                  min_length=min_length,
                                                  num_beams=num_beams,
                                                  eos_token_id=self.tokenizer.sep_token_id,
                                                  pad_token_id=self.tokenizer.pad_token_id,     
                                                  repetition_penalty=repetition_penalty,
                                                  **model_kwargs)            
            
        captions = []    
        for output in outputs:
            caption = self.tokenizer.decode(output, skip_special_tokens=True)    
            captions.append(caption[len(self.prompt):])
        return captions
    

def blip_decoder(pretrained='',**kwargs):
    model = BLIP_Decoder(**kwargs)
    if pretrained:
        model,msg = load_checkpoint(model,pretrained)
        assert(len(msg.missing_keys)==0)
    return model

In [6]:
#@title video QA code




from models.med import BertConfig, BertModel, BertLMHeadModel
from models.blip import create_vit, init_tokenizer, load_checkpoint

import torch
from torch import nn
import torch.nn.functional as F
from transformers import BertTokenizer
import numpy as np

class BLIP_VQA(nn.Module):
    def __init__(self,                 
                 med_config = 'configs/med_config.json',  
                 image_size = 480,
                 vit = 'base',
                 vit_grad_ckpt = False,
                 vit_ckpt_layer = 0,                   
                 ):
        """
        Args:
            med_config (str): path for the mixture of encoder-decoder model's configuration file
            image_size (int): input image size
            vit (str): model size of vision transformer
        """               
        super().__init__()
        
        self.visual_encoder, vision_width = create_vit(vit, image_size, vit_grad_ckpt, vit_ckpt_layer, drop_path_rate=0.1)
        self.tokenizer = init_tokenizer()  
        
        encoder_config = BertConfig.from_json_file(med_config)
        encoder_config.encoder_width = vision_width
        self.text_encoder = BertModel(config=encoder_config, add_pooling_layer=False) 
        
        decoder_config = BertConfig.from_json_file(med_config)        
        self.text_decoder = BertLMHeadModel(config=decoder_config)          


    def forward(self, image_embeds, question, image_size=480, answer=None, n=None, weights=None, train=True, inference='rank', k_test=128):
        batch_size = image_embeds.shape[0]
        #image_embeds = self.visual_encoder(image) 
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(device)
        
        question = self.tokenizer(question, padding='longest', truncation=True, max_length=35, 
                                  return_tensors="pt").to(device) 
        question.input_ids[:,0] = self.tokenizer.enc_token_id
        
        if train:               
            '''
            n: number of answers for each question
            weights: weight for each answer
            '''                     
            answer = self.tokenizer(answer, padding='longest', return_tensors="pt").to(device) 
            answer.input_ids[:,0] = self.tokenizer.bos_token_id
            answer_targets = answer.input_ids.masked_fill(answer.input_ids == self.tokenizer.pad_token_id, -100)      

            question_output = self.text_encoder(question.input_ids, 
                                                attention_mask = question.attention_mask, 
                                                encoder_hidden_states = image_embeds,
                                                encoder_attention_mask = image_atts,                             
                                                return_dict = True)    

            question_states = []                
            question_atts = []  
            for b, n in enumerate(n):
                question_states += [question_output.last_hidden_state[b]]*n
                question_atts += [question.attention_mask[b]]*n                
            question_states = torch.stack(question_states,0)    
            question_atts = torch.stack(question_atts,0)     

            answer_output = self.text_decoder(answer.input_ids, 
                                              attention_mask = answer.attention_mask, 
                                              encoder_hidden_states = question_states,
                                              encoder_attention_mask = question_atts,                  
                                              labels = answer_targets,
                                              return_dict = True,   
                                              reduction = 'none',
                                             )      
            
            loss = weights * answer_output.loss
            loss = loss.sum()/batch_size

            return loss
            

        else: 
            question_output = self.text_encoder(question.input_ids, 
                                                attention_mask = question.attention_mask, 
                                                encoder_hidden_states = image_embeds,
                                                encoder_attention_mask = image_atts,                                    
                                                return_dict = True) 
            
            if inference=='generate':
                num_beams = 4
                question_states = question_output.last_hidden_state.repeat_interleave(num_beams,dim=0)
                question_atts = torch.ones(question_states.size()[:-1],dtype=torch.long).to(question_states.device)
                model_kwargs = {"encoder_hidden_states": question_states, "encoder_attention_mask":question_atts}
                
                bos_ids = torch.full((batch_size,1),fill_value=self.tokenizer.bos_token_id,device=device)
                
                outputs = self.text_decoder.generate(input_ids=bos_ids,
                                                     max_length=150,
                                                     min_length=1,
                                                     num_beams=num_beams,
                                                     eos_token_id=self.tokenizer.sep_token_id,
                                                     pad_token_id=self.tokenizer.pad_token_id, 
                                                     **model_kwargs)
                
                answers = []    
                for output in outputs:
                    answer = self.tokenizer.decode(output, skip_special_tokens=True)    
                    answers.append(answer)
                return answers
            
            elif inference=='rank':
                max_ids = self.rank_answer(question_output.last_hidden_state, question.attention_mask, 
                                           answer.input_ids, answer.attention_mask, k_test) 
                return max_ids
 
                
                
    def rank_answer(self, question_states, question_atts, answer_ids, answer_atts, k):
        
        num_ques = question_states.size(0)
        start_ids = answer_ids[0,0].repeat(num_ques,1) # bos token
        
        start_output = self.text_decoder(start_ids, 
                                         encoder_hidden_states = question_states,
                                         encoder_attention_mask = question_atts,                                      
                                         return_dict = True,
                                         reduction = 'none')              
        logits = start_output.logits[:,0,:] # first token's logit
        
        # topk_probs: top-k probability 
        # topk_ids: [num_question, k]        
        answer_first_token = answer_ids[:,1]
        prob_first_token = F.softmax(logits,dim=1).index_select(dim=1, index=answer_first_token) 
        topk_probs, topk_ids = prob_first_token.topk(k,dim=1) 
        
        # answer input: [num_question*k, answer_len]                 
        input_ids = []
        input_atts = []
        for b, topk_id in enumerate(topk_ids):
            input_ids.append(answer_ids.index_select(dim=0, index=topk_id))
            input_atts.append(answer_atts.index_select(dim=0, index=topk_id))
        input_ids = torch.cat(input_ids,dim=0)  
        input_atts = torch.cat(input_atts,dim=0)  

        targets_ids = input_ids.masked_fill(input_ids == self.tokenizer.pad_token_id, -100)

        # repeat encoder's output for top-k answers
        question_states = tile(question_states, 0, k)
        question_atts = tile(question_atts, 0, k)
        
        output = self.text_decoder(input_ids, 
                                   attention_mask = input_atts, 
                                   encoder_hidden_states = question_states,
                                   encoder_attention_mask = question_atts,     
                                   labels = targets_ids,
                                   return_dict = True, 
                                   reduction = 'none')   
        
        log_probs_sum = -output.loss
        log_probs_sum = log_probs_sum.view(num_ques,k)

        max_topk_ids = log_probs_sum.argmax(dim=1) 
        max_ids = topk_ids[max_topk_ids>=0,max_topk_ids]

        return max_ids
    
    
def blip_vqa(pretrained='',**kwargs):
    model = BLIP_VQA(**kwargs)
    if pretrained:
        model,msg = load_checkpoint(model,pretrained)
#         assert(len(msg.missing_keys)==0)
    return model  


def tile(x, dim, n_tile):
    init_dim = x.size(dim)
    repeat_idx = [1] * x.dim()
    repeat_idx[dim] = n_tile
    x = x.repeat(*(repeat_idx))
    order_index = torch.LongTensor(np.concatenate([init_dim * np.arange(n_tile) + i for i in range(init_dim)]))
    return torch.index_select(x, dim, order_index.to(x.device))    
        

In [7]:
def read_video(path, transform=None, frames_num=1):
    frames = []
    cap = cv2.VideoCapture(path)
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # print(f"{length=} {fps=}")
    N = length//(frames_num)
    # N=5

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711))
        ])
    
    current_frame = 1
    for i in range(length):
        ret, frame = cap.read(current_frame)
        
        if ret and i==current_frame and len(frames)<frames_num:
           
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, Config['IMG_SIZE'], interpolation = cv2.INTER_CUBIC)
            frame = transform(frame).unsqueeze(0).to(device)
            frames.append(frame)
            current_frame += N
        
    cap.release()
    return frames

In [8]:
def get_frames_texts(frames, model_decoder, model_vqa):
    with torch.no_grad():
        texts = []
        for frame in frames:
            caption = model_decoder.generate(frame, sample=False, num_beams=3, max_length=50, min_length=20) 
            texts.append(caption)
        return texts

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [10]:
Config = dict(
    MAX_FRAMES = 512,
    IMG_SIZE = (384, 384),
)

caption = ''

In [11]:
import os

def absoluteFilePaths(directory):
    for dirpath,_,filenames in os.walk(directory):
        for f in filenames:
            yield os.path.abspath(os.path.join(dirpath, f))

In [12]:
def collect_frames(dir_path:str):
  frames_coll = dict()
  for path in absoluteFilePaths(dir_path):
    video_name = path.split('/')[-1].split('.')[0]
    frames = read_video(f'{path}', frames_num=Config['MAX_FRAMES'])
    frames_coll[video_name] = frames
  return frames_coll

In [13]:
frames = collect_frames('/content/video_scenes')

In [14]:
test_scene = frames['inf_video-Scene-005']

In [15]:
model_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_capfilt_large.pth'
    
model_decoder = blip_decoder(pretrained=model_url, image_size=Config['IMG_SIZE'][0], vit='base')
model_decoder.eval()
model_decoder = model_decoder.to(device)

reshape position embedding from 196 to 576
load checkpoint from https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_capfilt_large.pth


In [16]:
model_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base.pth'
    
feature_extractor = blip_feature_extractor(pretrained=model_url, image_size=Config['IMG_SIZE'][0], vit='base')
feature_extractor.eval()
feature_extractor = feature_extractor.to(device)

reshape position embedding from 196 to 576
load checkpoint from https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base.pth


In [17]:
model_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_vqa_capfilt_large.pth'
    
model_vqa = blip_vqa(pretrained=model_url, image_size=Config['IMG_SIZE'][0], vit='base')
model_vqa.eval()
model_vqa = model_vqa.to(device)

reshape position embedding from 900 to 576
load checkpoint from https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base_vqa_capfilt_large.pth


In [18]:
!nvidia-smi --query-gpu=utilization.memory --format=csv

utilization.memory [%]
1 %


In [19]:
def video_description(frames, model_decoder, feature_extractor):

    with torch.no_grad():
        flag = True
        for frame in frames:
            if flag:
              
                frames_embs = feature_extractor(frame, caption, mode='image')
                flag = False
            else:
                frames_embs = torch.cat((frames_embs, feature_extractor(frame, caption, mode='image')), dim=1)
        print(frames_embs.size())
        with torch.no_grad():
            text = model_decoder.generate(frames_embs, num_beams=7, max_length=300, min_length=15, top_p=0.9)

        return text

In [20]:
def videoQA(frames, model_vqa, feature_extractor, question):

    with torch.no_grad():
        flag = True
        for frame in frames:
            if flag:
                frames_embs = feature_extractor(frame, caption, mode='image')
                flag = False
            else:
                frames_embs = torch.cat((frames_embs, feature_extractor(frame, caption, mode='image')), dim=1)
        
        with torch.no_grad():
            answer = model_vqa(frames_embs, question, train=False, inference='generate')

        return answer

In [21]:
test_scene = frames['inf_video-Scene-008']

In [22]:
question1  = 'How much people are on the photo? Answer with one number'
question2  = 'Is there any humans on the picture? Where are they located?'
question3 = "What actions are perfomed on a video?"

question4 = "Where are the main objects on a video are located?"
question5 = "What humans are doing on a video?"
question6 = "What is shown on the picture?"
question3  = 'What is the main event on a video?'

questions = [
    'What is the main event on a video?',
    "What is shown on the picture?",
    "What humans are doing on a video?",
    "Where are the main objects on a video are located?",
    "What actions are perfomed on a video?", 
    'Is there any humans on the picture? Where are they located?',
    "What are the main objects on a video?"
    "How does scene changes throughout the video?"
    'How much humans are on the photo?',
    "How much non-human objects are on the photo?"
    "What are the main non-human objects are on the photo?",
    'How much people are on the photo? Answer with one number'

]

In [23]:
answer1 = videoQA(test_scene, model_vqa, feature_extractor, question1)

In [24]:
answer1

['2']

In [25]:
text = video_description(test_scene, model_decoder, feature_extractor)

torch.Size([1, 577, 768])


In [26]:
text

['an airport with cars parked on the side of the road']

In [27]:
import torch
import torchvision

dummy_input = (torch.randn(1,3, 384, 384, device="cuda"), " ", 'image')


# Providing input and output names sets the display names for values
# within the model's graph. Setting these does not change the semantics
# of the graph; it is only for readability.
#
# The inputs to the network consist of the flat list of inputs (i.e.
# the values you would pass to the forward() method) followed by the
# flat list of parameters. You can partially specify names, i.e. provide
# a list here shorter than the number of inputs to the model, and we will
# only set that subset of names, starting from the beginning.
input_names = [ 'decoder_inputs' ]
output_names = [ "output1" ]

torch.onnx.export(feature_extractor, dummy_input, "feature_extractor.onnx", verbose=True, input_names=input_names, output_names=output_names)

In [None]:
/content/drive/MyDrive/ttech

In [32]:
!pip install openvino-dev[torch,onnx]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting openvino-dev[onnx,torch]
  Downloading openvino_dev-2022.3.0-9052-py3-none-any.whl (5.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.8/5.8 MB[0m [31m44.2 MB/s[0m eta [36m0:00:00[0m
Collecting networkx<=2.8.8
  Downloading networkx-2.8.8-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m85.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting texttable>=1.6.3
  Downloading texttable-1.6.7-py2.py3-none-any.whl (10 kB)
Collecting openvino-telemetry>=2022.1.0
  Downloading openvino_telemetry-2022.3.0-py3-none-any.whl (20 kB)
Collecting addict>=2.4.0
  Downloading addict-2.4.0-py3-none-any.whl (3.8 kB)
Collecting jstyleson>=0.0.2
  Downloading jstyleson-0.0.2.tar.gz (2.0 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pandas~=1.3.5
  Downloading pandas-1.3.5-cp39-cp39-manylinux_2

In [35]:
from pathlib import Path
import openvino
from openvino.tools import mo
from openvino.runtime import Core, serialize

VISION_MODEL_OV = Path("blip_vision_model.xml")
ov_vision_model = mo.convert_model('/content/BLIP/feature_extractor.onnx', compress_to_fp16=True)
    # save model on disk for next usages
serialize(ov_vision_model, str(VISION_MODEL_OV))


In [36]:
core = Core()

# load models on device
ov_vision_model = core.compile_model(VISION_MODEL_OV)

In [34]:
!cp /content/BLIP/feature_extractor.onnx /

/content


In [2]:
!pip install scenedetect[opencv] --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting scenedetect[opencv]
  Downloading scenedetect-0.6.1-py3-none-any.whl (115 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/115.1 KB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.1/115.1 KB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: scenedetect
Successfully installed scenedetect-0.6.1


In [3]:
from scenedetect.scene_manager import DEFAULT_MIN_WIDTH
DEFAULT_MIN_WIDTH = 10**4

In [4]:
from scenedetect import detect, ContentDetector,  split_video_ffmpeg
from scenedetect import open_video, SceneManager, split_video_ffmpeg
from scenedetect.detectors import ContentDetector
from scenedetect.video_splitter import split_video_ffmpeg

def split_video_into_scenes(video_path, threshold=30.0):
    # Open our video, create a scene manager, and add a detector.
    video = open_video(video_path)
    scene_manager = SceneManager()
    scene_manager.add_detector(
        ContentDetector(threshold=threshold))
    scene_manager.auto_downscale = False
    scene_manager.downscale = 1
    scene_manager.detect_scenes(video, show_progress=True)
    scene_list = scene_manager.get_scene_list()
    # split_video_ffmpeg(video_path, scene_list, show_progress=True)
    return scene_list

# scene_list = detect('/content/inf_video.mp4', ContentDetector(kernel_size=15, threshold=75))

In [None]:
!cp /content/video_scenes_30.tar.gz /content/drive/MyDrive/ttech/

In [None]:
!mkdir /content/video_scenes_30
%cd /content/video_scenes_30


/content/video_scenes_30


In [5]:
scenes = split_video_into_scenes('/content/drive/MyDrive/ttech/Zombieland.2009.BDRip.1080p.mkv')

Detected: 0 | Progress:   0%|          | 0/126168 [00:00<?, ?frames/s]INFO:pyscenedetect:Detecting scenes...
Detected: 1580 | Progress: 100%|██████████| 126168/126168 [1:17:41<00:00, 27.06frames/s]


In [6]:
scenes_timecodes = []
for scene in scenes:
  time1, time2 = scene
  scenes_timecodes.append([time1.get_seconds(), time2.get_seconds()])
  

In [7]:
import json 
with open('/content/timecodes_scenes.json', 'x') as file:
  json.dump({'timecodes':scenes_timecodes},file,  ensure_ascii=True)

In [8]:
!mv /content/timecodes_scenes.json /content/drive/MyDrive/ttech/timecodes_scenes.json

In [None]:
!tar -czvf video_scenes_30.tar.gz /content/video_scenes_30/

In [None]:
answer1

['talking to each other']

In [None]:
print(f'1 ans: {answer1[0]}\n2 ans: {answer2[0]}')

1 ans: talk on phone
2 ans: man


In [None]:
import sys
import time
from PIL import Image
from transformers import BlipProcessor, BlipForQuestionAnswering

# sys.path.append("../utils")
# from notebook_utils import download_file

# # get model and processor
# processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
# model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base")

# # setup test input: download and read image, prepare question
# img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
# download_file(img_url, "demo.jpg")
# raw_image = Image.open("demo.jpg").convert('RGB')
# question = "how many dogs are in the picture?"
# # preprocess input data
# inputs = processor(raw_image, question, return_tensors="pt")

# start = time.perf_counter()
# # perform generation
# out = model.generate(**inputs)
# end = time.perf_counter() - start

# # postprocess result
# answer = processor.decode(out[0], skip_special_tokens=True)

In [1]:
!pip install --upgrade pip
!pip install git+https://github.com/huggingface/transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[0mLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/huggingface/transformers
  Cloning https://github.com/huggingface/transformers to /tmp/pip-req-build-d2v1uxqn
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/transformers /tmp/pip-req-build-d2v1uxqn
  Resolved https://github.com/huggingface/transformers to commit b29fd6971d9cd6ba2a824628effe243f543b8f61
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[0m

In [6]:
!pip install notebook_utils

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting notebook_utils
  Downloading notebook_utils-0.2.0-py3-none-any.whl (3.9 kB)
Collecting deprecated
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Installing collected packages: deprecated, notebook_utils
Successfully installed deprecated-1.2.13 notebook_utils-0.2.0
[0m

In [2]:
import transformers

In [8]:
import sys
import time
from PIL import Image
from transformers import BlipProcessor, BlipForQuestionAnswering

sys.path.append("../utils")


# get model and processor
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base")

# setup test input: download and read image, prepare question
img_url = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/demo.jpg'
# download_file(img_url, "demo.jpeg")
raw_image = Image.open("/content/demo.jpeg").convert('RGB')
question = "how many dogs are in the picture?"
# preprocess input data
inputs = processor(raw_image, question, return_tensors="pt")

start = time.perf_counter()
# perform generation
out = model.generate(**inputs)
end = time.perf_counter() - start

# postprocess result
answer = processor.decode(out[0], skip_special_tokens=True)



In [9]:
import torch
from pathlib import Path
from openvino.tools import mo
import numpy as np
from openvino.runtime import Core, serialize

VISION_MODEL_OV = Path("blip_vision_model.xml")
VISION_MODEL_ONNX = VISION_MODEL_OV.with_suffix(".onnx")
vision_model = model.vision_model
vision_model.eval()

# check that model works and save it outputs for reusage as text encoder input
with torch.no_grad():
    vision_outputs = vision_model(inputs["pixel_values"])

# if openvino model does not exist, convert it to onnx and then to IR
if not VISION_MODEL_OV.exists():

    # export pytorch model to ONNX
    if not VISION_MODEL_ONNX.exists():
        with torch.no_grad():
            torch.onnx.export(vision_model, inputs["pixel_values"], VISION_MODEL_ONNX, input_names=["pixel_values"])
    # convert ONNX model to IR using Model Optimizer Python API, use compress_to_fp16=True for compressing model weights to FP16 precision
    ov_vision_model = mo.convert_model(VISION_MODEL_ONNX, compress_to_fp16=True)
    # save model on disk for next usages
    serialize(ov_vision_model, str(VISION_MODEL_OV))
    print(f"Vision model successfuly converted and saved to {VISION_MODEL_OV}")
else:
    print(f"Vision model will be loaded from {VISION_MODEL_OV}")

Vision model successfuly converted and saved to blip_vision_model.xml


In [10]:
TEXT_ENCODER_OV = Path("blip_text_encoder.xml")
TEXT_ENCODER_ONNX = TEXT_ENCODER_OV.with_suffix(".onnx")

text_encoder = model.text_encoder
text_encoder.eval()

# if openvino model does not exist, convert it to onnx and then to IR
if not TEXT_ENCODER_OV.exists():
    if not TEXT_ENCODER_ONNX.exists():
        # prepare example inputs for ONNX export
        image_embeds = vision_outputs[0]
        image_attention_mask = torch.ones(image_embeds.size()[:-1], dtype=torch.long)
        input_dict = {"input_ids": inputs["input_ids"], "attention_mask": inputs["attention_mask"], "encoder_hidden_states": image_embeds, "encoder_attention_mask": image_attention_mask}
        # specify variable length axes
        dynamic_axes = {"input_ids": {1: "seq_len"}, "attention_mask": {1: "seq_len"}}
        # export PyTorch model to ONNX
        with torch.no_grad():
            torch.onnx.export(text_encoder, input_dict, TEXT_ENCODER_ONNX, input_names=list(input_dict), dynamic_axes=dynamic_axes)
    # convert ONNX model to IR using Model Optimizer Python API, use compress_to_fp16=True for compressing model weights to FP16 precision
    ov_text_encoder = mo.convert_model(TEXT_ENCODER_ONNX, compress_to_fp16=True)
    # save model on disk for next usages
    serialize(ov_text_encoder, str(TEXT_ENCODER_OV))
    print(f"Text encoder successfuly converted and saved to {TEXT_ENCODER_OV}")
else:
    print(f"Text encoder will be loaded from {TEXT_ENCODER_OV}")

  if is_decoder:
  if is_decoder:


Text encoder successfuly converted and saved to blip_text_encoder.xml


In [11]:
text_decoder = model.text_decoder
text_decoder.eval()

TEXT_DECODER_OV = Path("blip_text_decoder.xml")
TEXT_DECODER_ONNX = TEXT_DECODER_OV.with_suffix(".onnx")

# prepare example inputs for ONNX export
input_ids = torch.tensor([[30522]])  # begin of sequence token id
attention_mask = torch.tensor([[1]])  # attention mask for input_ids
encoder_hidden_states = torch.rand((1, 10, 768))  # encoder last hidden state from text_encoder
encoder_attention_mask = torch.ones((1, 10), dtype=torch.long)  # attention mask for encoder hidden states

input_dict = {"input_ids": input_ids, "attention_mask": attention_mask, "encoder_hidden_states": encoder_hidden_states, "encoder_attention_mask": encoder_attention_mask}
# specify variable length axes
dynamic_axes = {"input_ids": {1: "seq_len"}, "attention_mask": {1: "seq_len"}, "encoder_hidden_states": {1: "enc_seq_len"}, "encoder_attention_mask": {1: "enc_seq_len"}}

# specify output names, logits is main output of model
output_names = ["logits"]

# past key values outputs are output for caching model hidden state
past_key_values_outs = []
text_decoder_outs = text_decoder(**input_dict)
for idx, _ in enumerate(text_decoder_outs["past_key_values"]):
    past_key_values_outs.extend([f"out_past_key_value.{idx}.key", f"out_past_key_value.{idx}.value"])

# if openvino model does not exist, convert it to onnx and then to IR
if not TEXT_DECODER_OV.exists():
    # export PyTorch model to ONNX
    if not TEXT_DECODER_ONNX.exists():
        with torch.no_grad():
            torch.onnx.export(text_decoder, input_dict, TEXT_DECODER_ONNX, input_names=list(input_dict), output_names=output_names + past_key_values_outs, dynamic_axes=dynamic_axes)
    # convert ONNX model to IR using Model Optimizer Python API, use compress_to_fp16=True for compressing model weights to FP16 precision
    ov_text_decoder = mo.convert_model(TEXT_DECODER_ONNX, compress_to_fp16=True)
    # save model on disk for next usages
    serialize(ov_text_decoder, str(TEXT_DECODER_OV))
    print(f"Text decoder successfuly converted and saved to {TEXT_DECODER_OV}")
else:
    print(f"Text decoder will be loaded from {TEXT_DECODER_OV}")

  if causal_mask.shape[1] < attention_mask.shape[1]:
  if return_logits:


Text decoder successfuly converted and saved to blip_text_decoder.xml


In [12]:
# extend input dictionary with hidden states from previous step
input_dict_with_past = {**input_dict, "past_key_values": text_decoder_outs["past_key_values"]}

# provide names for past_key_value inputs in ONNX model
past_inputs = [k.replace("out_", "in_") for k in past_key_values_outs]

# extend input names list and dynamic axes with new inputs
input_names_with_past = list(input_dict) + past_inputs
dynamic_axes_with_past = {**dynamic_axes}
for k in past_inputs:
    dynamic_axes_with_past[k] = {2: "prev_seq_len"}

TEXT_DECODER_WITH_PAST_OV = Path("blip_text_decoder_with_past.xml")
TEXT_DECODER_WITH_PAST_ONNX = TEXT_DECODER_WITH_PAST_OV.with_suffix(".onnx")

# if openvino model does not exist, convert it to onnx and then to IR
if not TEXT_DECODER_WITH_PAST_OV.exists():
    # export PyTorch model to ONNX
    if not TEXT_DECODER_WITH_PAST_ONNX.exists():
        with torch.no_grad():
            torch.onnx.export(text_decoder, input_dict_with_past, TEXT_DECODER_WITH_PAST_ONNX, input_names=input_names_with_past, output_names=output_names + past_key_values_outs, dynamic_axes=dynamic_axes_with_past)
    # convert ONNX model to IR using Model Optimizer Python API, use compress_to_fp16=True for compressing model weights to FP16 precision
    ov_text_decoder = mo.convert_model(TEXT_DECODER_WITH_PAST_ONNX, compress_to_fp16=True)
    # save model on disk for next usages
    serialize(ov_text_decoder, str(TEXT_DECODER_WITH_PAST_OV))
    print(f"Text decoder with past successfuly converted and saved to {TEXT_DECODER_WITH_PAST_OV}")
else:
    print(f"Text decoder with past will be loaded from {TEXT_DECODER_WITH_PAST_OV}")

Text decoder with past successfuly converted and saved to blip_text_decoder_with_past.xml


In [13]:
# create OpenVINO Core object instance
core = Core()

# load models on device
ov_vision_model = core.compile_model(VISION_MODEL_OV)
ov_text_encoder = core.compile_model(TEXT_ENCODER_OV)
ov_text_decoder = core.compile_model(TEXT_DECODER_OV)
ov_text_decoder_with_past = core.compile_model(TEXT_DECODER_WITH_PAST_OV)

In [14]:
from typing import List, Tuple, Dict
from transformers.modeling_outputs import CausalLMOutputWithCrossAttentions


def prepare_past_inputs(past_key_values:List[Tuple[torch.Tensor, torch.Tensor]]):
    """
    Helper function for rearrange input hidden states inputs to OpenVINO model expected format
    Parameters:
      past_key_values (List[Tuple[torch.Tensor, torch.Tensor]]): list of pairs key, value attention hidden states obtained as model outputs from previous step
    Returns:
      inputs (Dict[str, torch.Tensor]): dictionary with inputs for model
    """
    inputs = {}
    for idx, (key, value) in enumerate(past_key_values):
        inputs[f"in_past_key_value.{idx}.key"] = key
        inputs[f"in_past_key_value.{idx}.value"] = value
    return inputs


def postprocess_text_decoder_outputs(output:Dict):
    """
    Helper function for rearranging model outputs and wrapping to CausalLMOutputWithCrossAttentions
    Parameters:
      output (Dict): dictionary with model output
    Returns
      wrapped_outputs (CausalLMOutputWithCrossAttentions): outputs wrapped to CausalLMOutputWithCrossAttentions format
    """
    outs = {k.any_name: v for k, v in output.items()}
    logits = torch.from_numpy(outs["logits"])
    past_kv = []
    for i in range(0, len(past_key_values_outs), 2):
        key = past_key_values_outs[i]
        value = key.replace(".key", ".value")
        past_kv.append((torch.from_numpy(outs[key]), torch.from_numpy(outs[value])))
    return CausalLMOutputWithCrossAttentions(
        loss=None,
        logits=logits,
        past_key_values=past_kv,
        hidden_states=None,
        attentions=None,
        cross_attentions=None
    )


def text_decoder_forward(input_ids:torch.Tensor, attention_mask:torch.Tensor, past_key_values:List[Tuple[torch.Tensor, torch.Tensor]], encoder_hidden_states:torch.Tensor, encoder_attention_mask:torch.Tensor, **kwargs):
    """
    Inference function for text_decoder in one generation step
    Parameters:
      input_ids (torch.Tensor): input token ids
      attention_mask (torch.Tensor): attention mask for input token ids
      past_key_values (List[Tuple[torch.Tensor, torch.Tensor]]): list of cached decoder hidden states from previous step
      encoder_hidden_states (torch.Tensor): encoder (vision or text) hidden states
      encoder_attention_mask (torch.Tensor): attnetion mask for encoder hidden states
    Returns
      model outputs (CausalLMOutputWithCrossAttentions): model prediction wrapped to CausalLMOutputWithCrossAttentions class including predicted logits and hidden states for caching
    """
    input_dict = {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "encoder_hidden_states": encoder_hidden_states,
        "encoder_attention_mask": encoder_attention_mask
    }
    if past_key_values is None:
        outputs = ov_text_decoder(input_dict)
    else:
        input_dict.update(prepare_past_inputs(past_key_values))
        outputs = ov_text_decoder_with_past(input_dict)
    return postprocess_text_decoder_outputs(outputs)


text_decoder.forward = text_decoder_forward


class OVBlipModel:
    """
    Model class for inference BLIP model with OpenVINO
    """
    def __init__(self, config, decoder_start_token_id:int, vision_model, text_encoder, text_decoder):
        """
        Initialization class parameters
        """
        self.vision_model = vision_model
        self.vision_model_out = vision_model.output(0)
        self.text_encoder = text_encoder
        self.text_encoder_out = text_encoder.output(0)
        self.text_decoder = text_decoder
        self.config = config
        self.decoder_start_token_id = decoder_start_token_id
        self.decoder_input_ids = config.text_config.bos_token_id

    def generate_answer(self, pixel_values:torch.Tensor, input_ids:torch.Tensor, attention_mask:torch.Tensor, **generate_kwargs):
        """
        Visual Question Answering prediction
        Parameters:
          pixel_values (torch.Tensor): preprocessed image pixel values
          input_ids (torch.Tensor): question token ids after tokenization
          attention_mask (torch.Tensor): attention mask for question tokens
        Retruns:
          generation output (torch.Tensor): tensor which represents sequence of generated answer token ids
        """
        image_embed = self.vision_model(pixel_values.detach().numpy())[self.vision_model_out]
        image_attention_mask = np.ones(image_embed.shape[:-1], dtype=int)
        if isinstance(input_ids, list):
            input_ids = torch.LongTensor(input_ids)
        question_embeds = self.text_encoder([input_ids.detach().numpy(), attention_mask.detach().numpy(), image_embed, image_attention_mask])[self.text_encoder_out]
        question_attention_mask = np.ones(question_embeds.shape[:-1], dtype=int)

        bos_ids = np.full((question_embeds.shape[0], 1), fill_value=self.decoder_start_token_id)

        outputs = self.text_decoder.generate(
            input_ids=torch.from_numpy(bos_ids),
            eos_token_id=self.config.text_config.sep_token_id,
            pad_token_id=self.config.text_config.pad_token_id,
            encoder_hidden_states=torch.from_numpy(question_embeds),
            encoder_attention_mask=torch.from_numpy(question_attention_mask),
            **generate_kwargs,
        )
        return outputs

    def generate_caption(self, pixel_values:torch.Tensor, input_ids:torch.Tensor = None, attention_mask:torch.Tensor = None, **generate_kwargs):
        """
        Image Captioning prediction
        Parameters:
          pixel_values (torch.Tensor): preprocessed image pixel values
          input_ids (torch.Tensor, *optional*, None): pregenerated caption token ids after tokenization, if provided caption generation continue provided text
          attention_mask (torch.Tensor): attention mask for caption tokens, used only if input_ids provided
        Retruns:
          generation output (torch.Tensor): tensor which represents sequence of generated caption token ids
        """
        batch_size = pixel_values.shape[0]

        image_embeds = self.vision_model(pixel_values.detach().numpy())[self.vision_model_out]

        image_attention_mask = torch.ones(image_embeds.shape[:-1], dtype=torch.long)

        if isinstance(input_ids, list):
            input_ids = torch.LongTensor(input_ids)
        elif input_ids is None:
            input_ids = (
                torch.LongTensor([[self.config.text_config.bos_token_id, self.config.text_config.eos_token_id]])
                .repeat(batch_size, 1)
            )
        input_ids[:, 0] = self.config.text_config.bos_token_id
        attention_mask = attention_mask[:, :-1] if attention_mask is not None else None

        outputs = self.text_decoder.generate(
            input_ids=input_ids[:, :-1],
            eos_token_id=self.config.text_config.sep_token_id,
            pad_token_id=self.config.text_config.pad_token_id,
            attention_mask=attention_mask,
            encoder_hidden_states=torch.from_numpy(image_embeds),
            encoder_attention_mask=image_attention_mask,
            **generate_kwargs,
        )

        return outputs

In [23]:
!mv /content/blip_text_encoder.onnx /content/drive/MyDrive/ttech/

In [17]:
ov_model = OVBlipModel(model.config, model.decoder_start_token_id, ov_vision_model, ov_text_encoder, text_decoder)
out = ov_model.generate_answer(**inputs, max_length=20)

In [None]:
out = ov_model.generate_caption(inputs["pixel_values"], max_length=20)
caption = processor.decode(out[0], skip_special_tokens=True)
fig = visualize_results(raw_image, caption)

In [None]:
start = time.perf_counter()
out = ov_model.generate_answer(**inputs, max_length=20)
end = time.perf_counter() - start
answer = processor.decode(out[0], skip_special_tokens=True)
fig = visualize_results(raw_image, answer, question)