In [None]:
%%capture
!pip install ktrain
!python -m pip install git+https://github.com/Vakihito/pytube
!git clone https://github.com/oarriaga/face_classification

In [None]:
random_state = 0

vid_id = "9J25DZhivz8"
lang = 'a.en' # automatic english
res = 480

# hyper-parameters for bounding boxes shape
frame_window = 10
emotion_offsets = (100, 200)
show_images = False
video_frequency = 1
text_pol_face_offset_y = -10

frame_limit = int(1e20)
size_shape = (640//2 , 480//2)

In [None]:
from pytube import YouTube
from pytube.helpers import safe_filename

In [None]:
import cv2
from statistics import mode
import os
from keras import backend as K
from keras.models import load_model
import numpy as np
from textblob import TextBlob
import random
import tensorflow as tf

from face_classification.src.utils.datasets import get_labels
from face_classification.src.utils.inference import detect_faces
from face_classification.src.utils.inference import draw_text
from face_classification.src.utils.inference import draw_bounding_box
from face_classification.src.utils.inference import apply_offsets
from face_classification.src.utils.inference import load_detection_model

from ktrain.text.sentiment import SentimentAnalyzer
import torch

In [None]:
img_feat_size = 112
text_feat_size = 768

random.seed(random_state)
tf.random.set_seed(random_state)
np.random.seed(random_state)
torch.manual_seed(random_state)

<torch._C.Generator at 0x7824d0cd9150>

# Text sentiment classification model

---

Creating functions for classification and embedding extraction from the text.


In [None]:
import keras
import ktrain
from ktrain import text
import tensorflow as tf
import pickle
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from keras import backend as K

text_classifier = SentimentAnalyzer()

text_sent_model = text_classifier.pipeline.model
text_tokenizer = text_classifier.pipeline.tokenizer

Downloading (…)lve/main/config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'


In [None]:
def get_embedding(input_text):
  global text_sent_model, text_tokenizer

  input_ids = torch.tensor(text_tokenizer.encode(input_text, add_special_tokens=True)).unsqueeze(0)  # Batch size 1
  text_sent_model.eval()
  with torch.no_grad():
    outputs = text_sent_model.roberta(input_ids)

  # gets mean from the embeddings
  embeddings = outputs.last_hidden_state.cpu().numpy()
  mean_embedding = np.mean(embeddings, axis=1)
  return mean_embedding[0]

In [None]:
get_embedding("Have nice day !").shape

(768,)

In [None]:
def predict_text(text):
    global text_classifier
    prediction = text_classifier.predict(text) # batch size 1
    print(f"prediction text: {prediction}")
    label, prob = zip(*(prediction.items()))
    label, prob = label[0], prob[0] # batch size 1

    color = np.asarray((255, 255, 255))

    if label == "NEUTRAL":
      polarity_text = 0
    elif label == "POSITIVE":
      polarity_text = prob
    elif label == "NEGATIVE":
      polarity_text = -prob

    if polarity_text < -0.4:
        color = abs(polarity_text) * np.asarray((255, 0, 0))
    elif polarity_text > 0.4:
        color = abs(polarity_text) * np.asarray((0, 255, 0))

    return color, polarity_text


In [None]:
predict_text("Have nice day !")

prediction text: {'POSITIVE': 0.9795164465904236}


(array([  0.        , 249.77669388,   0.        ]), 0.9795164465904236)

In [None]:
predict_text("I am having a hard weak!")

prediction text: {'NEGATIVE': 0.710917055606842}


(array([181.28384918,   0.        ,   0.        ]), -0.710917055606842)

In [None]:
predict_text("I am who I am")

prediction text: {'NEUTRAL': 0.5452864766120911}


(array([255, 255, 255]), 0)

# Caption

Definindo algumas funções para tratamento das captions geradas pelo Pytube

In [None]:
# gets the string time as HH:MM:SS and return in seconds
def string_time_int(str_time):
  segundos = int(str_time[-2:])
  segundos += 60 * int(str_time[-5:-3])
  segundos += 3600 * int(str_time[:-6])
  return segundos
def process_caption(video, lang='a.en'):
  '''
    extract the information from the caption and saves in a caption struct
  '''
  has_caption = False
  print(video.captions)
  for cap in video.captions:
    if cap.code == lang:
      has_caption = True

  if not has_caption:
    print("caption of lang : " + lang + " not found")

  video_len = video.length

  captions  = video.captions[lang]
  captions_str = str(captions.generate_srt_captions())
  list_captions = []
  line_counter = 1
  for line in captions_str.split('\n'):
      # time type
      if (line_counter + 2) % 4  == 0:
          line_aux = line.split(" --> ")
          time_s = string_time_int(line_aux[0][:-4])
          time_e = string_time_int(line_aux[1][:-4])
      # comment type
      if (line_counter + 1) % 4  == 0 and video_len >= time_e:
          list_captions.append([line, time_s, time_e])
      line_counter += 1

  return list_captions

In [None]:
def my_preprocess_input(x, v2=True):
    x = x.astype('float32')
    x = x / 255.0
    if v2:
        x = x - 0.5
        x = x * 2.0
    return x

# parameters for loading data and images
detection_model_path = './face_classification/trained_models/detection_models/haarcascade_frontalface_default.xml'
emotion_model_path = './face_classification/trained_models/emotion_models/fer2013_mini_XCEPTION.51-0.63.hdf5'
emotion_labels = get_labels('fer2013')

# Carregando os modelos de imagem

In [None]:
# loading models
face_detection = load_detection_model(detection_model_path)
emotion_classifier = load_model(emotion_model_path, compile=False)


### gets the embedding from the emtion_classifier
def getFaceEmbedding(gray_face,layer_out=3):
  global emotion_classifier
  inp = emotion_classifier.input

  functor1 = K.function([inp], emotion_classifier.layers[-layer_out].output)
  emotion_prediction = functor1([gray_face])[0]

  return emotion_prediction.flatten()

# Fazendo download do vídeo
Utilizamos o pytube para extrair a resolução do vídeo. Nesse sentido, os seguintes parâmetros são importante:
 - vid_id - é o id do vídeo do youtube que desejamos extrair
 - res - é a resulução do vídeo que desejamos extrair - creio que seja melhor manter 480, para o bom funcionamento das redes neurais
 - lang - é lingua da caption, indicaria utilizar 'en', mas caso queira utilizar legendas automáticas utilize 'a.en'

In [None]:

# getting input model shapes for inference
emotion_target_size = emotion_classifier.input_shape[1:3]

# starting lists for calculating modes
emotion_window = []

# starting video streaming




url = "https://www.youtube.com/watch?v=" + vid_id
print("downloading the video from the ulr : ", url)
video = YouTube(url)

max_res = -1
itag_max = -1

fps_my = 30
for stream in video.streams:
    if stream.resolution and stream.mime_type == "video/mp4":
      current_res = int(stream.resolution[:-1])
      current_fps = int(stream.fps)
      if current_res <= res and max_res < current_res:
        max_res = current_res
        itag_max = stream.itag
        fps_my =  float(current_fps)

if max_res == -1:
  print("erro : chose another resolution")

font_scale_cur = max_res/144

print(str(itag_max))
video.streams.get_by_itag(str(itag_max)).download()

dirname = './'
video_path = os.path.join(dirname, safe_filename(video.title) + '.mp4')


#documentation: https://pypi.org/project/pafy/
# video_capture = cv2.VideoCapture(video_path)

captions_list = process_caption(video, lang)
captions_list.sort(key=lambda x: x[1])


time = 0
frame_counter = 0
caption_counter = 0

downloading the video from the ulr :  https://www.youtube.com/watch?v=9J25DZhivz8
18
{'a.en': <Caption lang="English (auto-generated)" code="a.en">}


In [None]:
# removendo certos caracteres da caption
for i in range(len(captions_list)):
  captions_list[i][0] = captions_list[i][0].replace(u'\xa0', u' ')

In [None]:
# definindo um dicionario que relacionará os valores
video_dict_all = {
    'frame' : [],
    'time' : [],
    'caption' : [],
    'caption_polarity' : [],
    'caption_emb' : [],
    'face' : [],
    'face_polarity' : [],
    'face_emb' : []

}

In [None]:
def fill_dict(frame,time,
              caption, caption_polarity, caption_emb,
              face, face_polarity, face_emb):
  video_df_all = {
    'frame' : [frame],
    'time' : [time],
    'caption' : caption,
    'caption_polarity' : caption_polarity,
    'caption_emb' : caption_emb,
    'face' : face,
    'face_polarity' : face_polarity,
    'face_emb' : face_emb
  }

  if len(caption) == 0:
    video_df_all['caption'] = ['-1']
    video_df_all['caption_polarity'] = [0]
    video_df_all['caption_emb'] = [np.zeros(text_feat_size)]

  if len(face) == 0:
    video_df_all['face'] = ['-1']
    video_df_all['face_polarity'] = [0]
    video_df_all['face_emb'] = [np.zeros(img_feat_size)]

  if len(face) >= 2:
    for i in range(len(face) - 1):
      video_df_all['frame'].append(video_df_all['frame'][-1])
      video_df_all['time'].append(video_df_all['time'][-1])
      video_df_all['caption'].append(video_df_all['caption'][-1])
      video_df_all['caption_polarity'].append(video_df_all['caption_polarity'][-1])
      video_df_all['caption_emb'].append(video_df_all['caption_emb'][-1])

  return video_df_all

In [None]:
captions_list_size = len(captions_list)

def return_text(time, counter):
  temp_text = ""
  for i in range(counter, captions_list_size):
    if time >= captions_list[i][1] and time <= captions_list[caption_counter][2]:
      temp_text +=  captions_list[i][0] + " "
    elif captions_list[i][1] > time:
      return  temp_text
  return temp_text

In [None]:
def ResizeWithAspectRatio(image, width=None, height=None, inter=cv2.INTER_AREA):
    dim = None
    (h, w) = image.shape[:2]

    if width is None and height is None:
        return image
    if width is None:
        r = height / float(h)
        dim = (int(w * r), height)
    else:
        r = width / float(w)
        dim = (width, int(h * r))

    return cv2.resize(image, dim, interpolation=inter)

# Loop de processamento
Neste loop processamos o vídeo através dos modelos unimodais
 - frame_limit - o limite de frames a serem analizados, caso queira analizar todos os frames coloque inf.
 - size_shape - formato da imagem a ser gerada
  

In [None]:
from google.colab.patches import cv2_imshow

In [None]:

img_counter = 0
time = 0
frame_counter = 0
caption_counter = 0

In [None]:
video_capture = cv2.VideoCapture(video_path)

In [None]:
!mkdir imgs
!mkdir imgs/unimodel
!mkdir imgs/multimodel

In [None]:


# Coordenadas
# 1 - distancia da borda lateral esquerda
# 2 - distancia da borda superior
#


In [None]:
while True:
    success , bgr_image = video_capture.read()

    if not success:
        print("fim !")
        break

    gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)
    rgb_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)
    faces = detect_faces(face_detection, gray_image)

    time = frame_counter/fps_my


    caption = []
    caption_polarity = []
    caption_emb = []

    cur_text = return_text(time, caption_counter)
    if len(cur_text) > 0:
        color_caption, caption_polarity = predict_text(cur_text)
        caption = [cur_text]
        caption_polarity = [caption_polarity]
        caption_emb = [get_embedding(cur_text)]
        if frame_counter % video_frequency == 0:

          print("caption polarity:", caption_polarity)
          print("caption sentence:", cur_text)

        color_caption = color_caption.tolist()
        draw_text(np.array([10,max_res - 10,50,50]), rgb_image, cur_text,color_caption,font_scale=0.4, thickness=1)
        while caption_counter < len(captions_list) - 1 and time >= captions_list[caption_counter][2]:
            caption_counter += 1

    face = []
    face_polarity = []
    face_emb = []
    if len(faces) == 0 :
      print("face not found !")

    for face_coordinates in faces[:1]:

        x1, x2, y1, y2 = apply_offsets(face_coordinates, emotion_offsets)
        x1 = x1 if x1 > 0 else 0
        y1 = y1 if y1 > 0 else 0

        gray_face = gray_image[y1:y2, x1:x2]

        try:
          gray_face = cv2.resize(gray_face, (emotion_target_size))
        except cv2.error:
          print('error : on resize')
          print(f"cv2 error {cv2.error}")
          continue

        gray_face = my_preprocess_input(gray_face, True)
        gray_face = np.expand_dims(gray_face, 0)
        gray_face = np.expand_dims(gray_face, -1)

        emotion_prediction = emotion_classifier.predict(gray_face)
        emotion_probability = np.max(emotion_prediction)
        emotion_label_arg = np.argmax(emotion_prediction)

        emotion_text = emotion_labels[emotion_label_arg]
        emotion_window.append(emotion_text)

        if len(emotion_window) > frame_window:
            emotion_window.pop(0)
        try:
            emotion_mode = mode(emotion_window)
        except:
            continue

        ### threshold wall
        # ysnp = you shall not pass
        ysnp = True
        sentiment = 'neutral'
        if emotion_text == 'angry' and emotion_probability >= 0.5:
            ysnp = True
            sentiment = 'negative'
        if emotion_text == 'happy' and emotion_probability >= 0.6:
            ysnp = True
            sentiment = 'positive'
        if emotion_text == 'sad' and emotion_probability >= 0.45:
            ysnp = True
            sentiment = 'negative'
        if emotion_text == 'neutral' and emotion_probability >= 0.5:
            ysnp = True
            sentiment = 'neutral'
        if emotion_text == 'fear' and emotion_probability >= 0.5:
            ysnp = True
            sentiment = 'negative'
        if emotion_text == 'surprise' and emotion_probability >= 0.4:
            ysnp = True
            sentiment = 'positive'

        if ysnp:
            senti_multi = 0
            if sentiment == 'negative':
                senti_multi = -1
                color = emotion_probability * np.asarray((255, 0, 0))
            elif sentiment == 'positive':
                senti_multi = 1
                color = emotion_probability * np.asarray((0, 255, 0))
            else:
                senti_multi = 0
                color = emotion_probability * np.asarray((0, 0, 255))

            color = color.astype(int)
            color = color.tolist()

            draw_bounding_box(face_coordinates, rgb_image, color)
            draw_text(face_coordinates, rgb_image, sentiment,color, 0, text_pol_face_offset_y, 1, 1)
            if frame_counter % video_frequency == 0:
              print('face polarity: ', emotion_probability * senti_multi,emotion_text)


            face.append('img' + str(img_counter))
            face_polarity.append(emotion_probability * senti_multi)
            face_emb.append(getFaceEmbedding(gray_face) )

            img_counter += 1
        elif frame_counter % video_frequency == 0:
            print("you shall not pass")


    dict_temp= fill_dict(frame_counter, time,
                        caption, caption_polarity, caption_emb,
                        face, face_polarity, face_emb)
    for key in dict_temp.keys():
      video_dict_all[key] += dict_temp[key]

    frame_counter += 1

    print("fc : ",frame_counter)
    resized = cv2.resize(rgb_image, size_shape)
    bgr_image = cv2.cvtColor(resized, cv2.COLOR_RGB2BGR)
    cv2.imwrite(f"imgs/unimodel/frame_{frame_counter}.png", bgr_image)
    if show_images:
      print('face polarity: ', emotion_probability * senti_multi,emotion_text)
      print('sentiment prob : ', emotion_probability)
      cv2_imshow(bgr_image)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    print("#" * 50)
    if(frame_counter > frame_limit):
        break

video_capture.release()
cv2.destroyAllWindows()

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
fc :  6766
##################################################
prediction text: {'POSITIVE': 0.9237704277038574}
caption polarity: [0.9237704277038574]
caption sentence:  i go i want to say thank you to my new    subscribers thanks and ask you guys are 
face polarity:  0.0 neutral
fc :  6767
##################################################
prediction text: {'POSITIVE': 0.9237704277038574}
caption polarity: [0.9237704277038574]
caption sentence:  i go i want to say thank you to my new    subscribers thanks and ask you guys are 
face polarity:  0.0 neutral
fc :  6768
##################################################
prediction text: {'POSITIVE': 0.9237704277038574}
caption polarity: [0.9237704277038574]
caption sentence:  i go i want to say thank you to my new    subscribers thanks and ask you guys are 
face polarity:  0.0 sad
fc :  6769
##################################################
prediction text: {'POSITIV

# Correlação entre caption e text
 - time  - mostra o tempo em segundos de início da análise
 - frame - mostra o frame sobre análise
 - caption - mostra a legenda mostrada
 - caption_polarity - mostra a polaridade da caption
 - caption_emb - mostra a embedding gerada pelo Bert
 - face - mostra um couter referente a imagem
 - face_polarity - mostra a polaridade da face
 - face_emb - mostra a embedding gerada pelo modelo de faces

In [None]:
import pandas as pd

video_df = pd.DataFrame(video_dict_all)

video_df.head()

Unnamed: 0,frame,time,caption,caption_polarity,caption_emb,face,face_polarity,face_emb
0,0,0.0,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1,0.033333,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,2,0.066667,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,3,0.1,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,4,0.133333,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


# Gerando concordância entre modelos
Neste passo preparamos o modelo para treino

In [None]:
def concord_bool(a,b):
  multi = a * b
  if multi >= 0:
    return True
  return False

def concord_val(a,b):
  (pa , pb) = (a,b)
  if abs(a) < 0.3:
    pa = 0
  if abs(b) < 0.3:
    pb = 0
  multi = pa * pb
  if multi >= 0:
    if pa < 0 or pb < 0:
      return 'negative'
    if pa > 0 or pb > 0:
      return 'positive'
  return 'neutral'

def return_dict():
  dict_aux = {
    'frame' : [],
    'time' : [],
    'caption' : [],
    'caption_polarity' : [],
    'caption_emb' : [],
    'face' : [],
    'face_polarity' : [],
    'face_emb' : [],
    'intent' : []
  }
  return dict_aux

In [None]:
def get_concord_embedding():
  global video_df
  couter = 0
  df_train = return_dict()
  df_test = return_dict()
  concord = []
  for i , row in video_df.iterrows():
    if concord_bool(row['face_polarity'],row['caption_polarity']):
      for key in row.keys():
        df_train[key].append(row[key])
      df_train['intent'].append(concord_val(row['face_polarity'],row['caption_polarity']))
      concord.append(1)
    else:
      for key in row.keys():
        df_test[key].append(row[key])
      df_test['intent'].append(concord_val(row['face_polarity'],row['caption_polarity']))
      concord.append(0)
  video_df['consent'] = concord
  return df_train, df_test


In [None]:
(df_train, df_test) =  get_concord_embedding()
df_train = pd.DataFrame(df_train)
df_test = pd.DataFrame(df_test)

In [None]:
import tensorflow as tf
ACCURACY_THRESHOLD = 0.95
class acc_callback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
        if(logs.get('accuracy') > ACCURACY_THRESHOLD):
            self.model.stop_training = True

# Definindo um modelo
Para esta aplicação definimos um modelo de atenção em relação a modalidade

In [None]:
from tensorflow.keras.layers import Dot,Activation,Dense, Input, concatenate, multiply, average, subtract, add, Dropout, Lambda, Flatten
from tensorflow.keras.models import Model
import tensorflow as tf


def multimodal_text_image(num_classes,size_1=img_feat_size,size_2=text_feat_size,operator='att',verbose=0):

  # fusion_dim = X_1.shape[1]+X_2.shape[1]
  fusion_dim = size_1

  inp1 = Input(shape=size_1)
  inp2 = Input(shape=size_2)

  l1 = Dense(fusion_dim, activation='relu')(inp1)
  l2 = Dense(fusion_dim, activation='relu')(inp2)

  if(operator=='att'):
    visual_embd = Lambda(lambda x: tf.keras.backend.l2_normalize(x, axis=-1))(l1) # (bs, ndim)
    average_seq = Lambda(lambda x: tf.keras.backend.l2_normalize(x, axis=-1))(l2) # (bs, ndim)
    scalar_visual = Dense(1)(visual_embd) # (bs, 1)
    scalar_text = Dense(1)(average_seq) # (bs, 1)
    scalars = concatenate([scalar_visual, scalar_text], name='concat')  # (bs, 2)

    # # Step 2. Normalize weights - softmax
    alphas = Activation('softmax')(scalars) # (bs, 2)

    # Step 3. Weighted average
    visual_embd_2 = Lambda( lambda x: tf.keras.backend.expand_dims(x) ) (visual_embd) # (bs, ndim, 1)
    average_seq_2 = Lambda( lambda x: tf.keras.backend.expand_dims(x) )(average_seq) # (bs, ndim, 1)
    features = concatenate([visual_embd_2, average_seq_2], name='concat_feats') # (bs, ndim, 2)
    w = Dot(axes=[-1, -1])([alphas, features]) # (bs, ndim)

  w = Dropout(0.5)(w)
  # fusion_layer = Dense(fusion_dim, activation='relu')(w)
  fusion_layer = w

  if (operator == 'att_labels'): # nm: new
    output = Dense(1)(fusion_layer)  # (batch_size, nb_labels, 1)
    output = Lambda(lambda x: tf.keras.backend.squeeze(x, axis=-1))(output)  # (batch_size, nb_labels)
    output = Activation('softmax')(output)  # (batch_size, nb_labels)
  else:
    output = Dense(num_classes,activation='softmax')(fusion_layer)

  model = Model(inputs=[inp1, inp2], outputs=output)

  model.compile(optimizer='adam', loss='categorical_crossentropy')
  model.summary()

  return model, fusion_layer



# Treinando o modelo

In [None]:
import keras
from tqdm.notebook import tqdm

target = 'intent'

merging_layers = ['autoencoder', 'att_labels','att','concatenate','add','subtract','average','multiply']

num_classes = len(df_train[target].unique())
operator = "att"
# Gerando os modelos
my_model, _ = multimodal_text_image(num_classes,operator=operator)




# merging_layers = ['att', 'att_labels'] # test
results = []



(df_data_train, df_data_test) = (df_train, df_test)

total_rows = len(df_data_train[target])

# modalidade da imagem
X_1 = np.array(df_data_train['face_emb'].to_list())
# modalidade do texto
X_2 = np.array(df_data_train['caption_emb'].to_list())

Y = np.array(pd.get_dummies(df_data_train[target]))

num_classes = len(df_data_train[target].unique())

# modalidades imagem e texto do teste
X_1_test = np.array(df_data_test['face_emb'].to_list())
X_2_test = np.array(df_data_test['caption_emb'].to_list())

acc_call = acc_callback()


history = my_model.fit([X_1,X_2], Y,
                  epochs=50,
                  batch_size=16,
                  shuffle=True,verbose=True)


Model: "model_7320"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 112)]        0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 768)]        0           []                               
                                                                                                  
 dense (Dense)                  (None, 112)          12656       ['input_1[0][0]']                
                                                                                                  
 dense_1 (Dense)                (None, 112)          86128       ['input_2[0][0]']                
                                                                                         

In [None]:
y_pred = np.array([])
if len(X_1_test) > 0:
  probs = my_model.predict([X_1_test,X_2_test])
  y_pred = np.argmax(probs,axis=1)
  print("size, test case : ", len(X_1_test))
else:
  print("\033[93mnot a single contradiction between the unmodal models\033[0m")

size, test case :  292


In [None]:
dict_y_pred = {
    0 : 'negative',
    1 : 'neutral',
    2 : 'positive'
}

In [None]:
y_pred_value = [dict_y_pred[pred] for pred in y_pred]


In [None]:
index = video_df[video_df['consent'] == 0].index


In [None]:
y_pred_value_size = len(y_pred_value)

tuple_idx_val = []

for i in range(y_pred_value_size):
  tuple_idx_val.append((index[i], y_pred_value[i]))

In [None]:
video_consent_df = video_df[video_df['consent'] == 1]

for idx, row in video_consent_df.iterrows():
  tuple_idx_val.append((idx, concord_val(row['caption_polarity'],row['face_polarity']) ))

In [None]:
tuple_idx_val.sort(key=lambda x:x[0])
final_pred = [i[1] for i in tuple_idx_val]


video_df['final_pred'] = final_pred

# Resultados do modelo multimodal
 - consent - consentimento entre modelo de faces e de texto - 1 - verdadeiro, 0 - falso
 - final_pred - previsão final

In [None]:
video_df.head()

Unnamed: 0,frame,time,caption,caption_polarity,caption_emb,face,face_polarity,face_emb,consent,final_pred
0,0,0.0,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1,neutral
1,1,0.033333,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1,neutral
2,2,0.066667,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1,neutral
3,3,0.1,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1,neutral
4,4,0.133333,-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",-1,0.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1,neutral


In [None]:
def draw_plus_sign(img, center=(10,10) ):
  start_point = (center[0] - 5, center[1] - 2)
  end_point = (center[0] + 5, center[1] + 2)
  img_r = cv2.rectangle(img, start_point, end_point, (0,255,0), -1)

  start_point = (center[0] - 2, center[1] - 5)
  end_point = (center[0] + 2, center[1] + 5)
  img_r = cv2.rectangle(img_r, start_point, end_point, (0,255,0), -1)

  return img_r

def draw_minus_sign(img, center=(10,10)):
  start_point = (center[0] - 5, center[1] - 2)
  end_point = (center[0] + 5, center[1] + 2)
  img_r = cv2.rectangle(img, start_point, end_point, (255,0,0), -1)

  return img_r

In [None]:
time_arr =video_df['time'].to_list()
final_pred_arr = video_df['final_pred'].to_list()
size_total = len(time_arr)

In [None]:
tuple_time_pred = [(time_arr[i], final_pred_arr[i]) for i in range(size_total)]


# Mostrando o resultado do modelo multimodal na janela
 - sinal positivo significa que a seção apresentou um polaridade positiva
 - sinal negativo significa que a seção apresentou um polaridade negativa

In [None]:
### re inicializando as variáveis.

video_capture = cv2.VideoCapture(video_path)
time = 0
frame_counter = 0
caption_counter = 0
counter_tuple_total = 0
sentiment_polarity = ''

In [None]:
while True:
    success , bgr_image = video_capture.read()

    if not success:
        break
    gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)
    rgb_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)

    time = frame_counter/fps_my



    if time >= captions_list[caption_counter][1] and time <= captions_list[caption_counter][2]:
        color_caption = (255,255,255)
        draw_text(np.array([10,max_res - 30,50,50]), rgb_image, cur_text,color_caption,font_scale=0.4, thickness=1)

        if caption_counter < len(captions_list) - 1 and time >= captions_list[caption_counter + 1][1]:
            caption_counter += 1

    if time >= tuple_time_pred[counter_tuple_total][0]:
        sentiment_polarity = tuple_time_pred[counter_tuple_total][1]
        if (tuple_time_pred[counter_tuple_total][1] == 'positive') :
          rgb_image = draw_plus_sign(rgb_image)
        elif (tuple_time_pred[counter_tuple_total][1] == 'negative'):
          rgb_image = draw_minus_sign(rgb_image)

        if counter_tuple_total < len(tuple_time_pred) - 1 and time >= tuple_time_pred[counter_tuple_total + 1][0]:
            counter_tuple_total += 1


    frame_counter += 1


    print("fc : ",frame_counter)
    print(sentiment_polarity)
    resized = cv2.resize(rgb_image, size_shape)
    bgr_image = cv2.cvtColor(resized, cv2.COLOR_RGB2BGR)
    if show_images:
      cv2_imshow(bgr_image)
    cv2.imwrite(f"imgs/multimodel/frame_{frame_counter}.png", bgr_image)

    if cv2.waitKey(33) & 0xFF == ord('q'):
        break
    print("#" * 50)
    if(frame_counter > frame_limit):
        break

video_capture.release()
cv2.destroyAllWindows()

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
positive
##################################################
fc :  5823
positive
##################################################
fc :  5824
positive
##################################################
fc :  5825
positive
##################################################
fc :  5826
positive
##################################################
fc :  5827
positive
##################################################
fc :  5828
positive
##################################################
fc :  5829
positive
##################################################
fc :  5830
positive
##################################################
fc :  5831
positive
##################################################
fc :  5832
positive
##################################################
fc :  5833
positive
##################################################
fc :  5834
positive
##################################################
fc :  5835
posi

In [None]:
import os
os.system(f"zip imgs_{vid_id}.zip -r imgs")

0

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import shutil
shutil.move(f"imgs_{vid_id}.zip",f"/content/drive/MyDrive/webmidia/videos/imgs_{vid_id}.zip")

'/content/drive/MyDrive/webmidia/videos/imgs_9J25DZhivz8.zip'