
# **AUDIO VIDEO CLASSIFICATION** 

# *Install the required packages*

In [None]:
!pip install transformers
!pip install SpeechRecognition
!pip install pydub
!pip install moviepy
!pip install pafy
!pip install youtube_dl
!pip install youtube_transcript_api
!pip install googletrans
!pip install langdetect

# *Import the the required packages*

In [None]:
### Building the deep learning model
from tensorflow.keras import models, layers, preprocessing
from tensorflow.keras.models import Model, Sequential
#from tensorflow.keras import optimizers, losses, activations, models
from tensorflow.keras.layers import Conv2D, Dense, Input, Dropout, MaxPooling2D, GlobalAveragePooling1D, GlobalAveragePooling2D

### Image classification
from tensorflow.keras import applications
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.inception_v3 import preprocess_input

### Text Classification
from transformers import TFBertModel,  BertConfig, BertTokenizerFast
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from googletrans import Translator
import joblib

### Audio processing and speech recognition
import speech_recognition as sr 
from pydub import AudioSegment
from pydub.silence import split_on_silence
import moviepy.editor as mp
from youtube_transcript_api import YouTubeTranscriptApi
from langdetect import detect

### Text processing and other packages
import nltk 
import re
import numpy as np
import os
import cv2
import json
import pafy

nltk.download('stopwords')
nltk.download('wordnet')

# *Downloading videos from Youtube*



In [None]:
def download_video(url, path, name):
    video=pafy.new(url)
    
    ### get the best video
    best_video=video.getbest()
    best_video.download(filepath = path + '/' + name + '.mp4')
    
    #os.rename(path + '/' + video.title + '.mp4', path + '/' + name + '.mp4')

    ### If available the video details are retrieved
    meta = {
        "title": str(video.title),
        "author": str(video.author),
        "duration": str(video.duration),
        "resulotion": str(best_video),
    }
    return meta


# *Downloading subtitles if available or extracting audio from video and speech recognition*

In [None]:
### Function to get subtitles of the video from youtube
def video_subs(url):
    
    ### Check and get auto generated youtube subtitles if available 
    try:
        srt = YouTubeTranscriptApi.get_transcript(url)
        text = [i['text'] for i in srt ]
        text = ('. '.join(text))
    except Exception as e:
        text = "Error"
    return text

### Speech recognition
def audio_to_text(path, name):
    
    r = sr.Recognizer()
    
    ### Extract audio from video and store as '.wav'
    temp_aud = mp.VideoFileClip(path + '/' + name + '.mp4')  
    temp_aud.audio.write_audiofile(path + '/' + name + '.wav')
    
    ### Split the audio along the silent parts
    audio_file = AudioSegment.from_wav(path + '/' + name + '.wav')  
    chunks = split_on_silence(audio_file,
        min_silence_len = 500,
        silence_thresh = audio_file.dBFS-13,
        keep_silence=500,
    )

    folder_name = "chunks"

    ### Create a directory to store the audio chunks
    if not os.path.isdir(folder_name):
        os.mkdir(folder_name)
    whole_text = ""

    ### Process each chunk 
    for i, audio_chunk in enumerate(chunks, start=1):

        chunk_filename = os.path.join(folder_name, f"chunk{i}.wav")
        audio_chunk.export(chunk_filename, format="wav")
        ### Recognize the chunk
        with sr.AudioFile(chunk_filename) as source:
            audio_listened = r.record(source)
            ### Try converting it to text
            try:
                text = r.recognize_google(audio_listened)
            except sr.UnknownValueError as e:
                e = " "
            else:
                text = f"{text.capitalize()}. "
                whole_text += text
    os.remove(path + '/chunks')
    return whole_text


# *Generate the initial data of the video(Audio and video details)*

In [None]:
### Data of the video consisitng of audio text and other details if available
def generate_min_data(path, url, name):
    
    ### Download the video
    data = download_video(url, path, name)
    
    ### Get the subtitles if available
    subs = video_subs(url[(url.find('=')) + 1:])
    if(subs == "Error"):
        ### If not use speech recognition
        audio_text = audio_to_text(path, name)
    else:
        audio_text = subs
    data["audio"] = audio_text
    if(audio_text != ""):
        data["language"] = detect(audio_text)
    ### Write the data in a file
    with open(path + '/' + name + '.txt', 'w') as json_file:
        json.dump(data,json_file)


# *Convert Videos to frames and store under the name of the video*

In [None]:
### Function to convert video into frames
def video_to_frames(video_path, frame_path, name, folder_name, frame_rate): 
      
    vid = cv2.VideoCapture(video_path + '/' + name +  ".mp4") 
    frame_num = frame_rate
    ### Variable to check whether frames were extracted 
    success = True
    
    #frames = []
    if(os.path.exists(frame_path + '/' + folder_name) == False):
        os.mkdir(frame_path + '/' + folder_name)
    while success:
        success, frame = vid.read()
        if(success != True):
          break 
        if(frame_num % frame_rate == 0):
            cv2.imwrite(frame_path + '/' + folder_name + '/' + name + '_' + str(frame_num) + ".jpg", frame)
        frame_num += 1
    print("Done")         
    #return frames        
    

# *Image data generator*

In [None]:
def generate_data(frame_rate, path, row = 360, column = 360):

  data_gen = ImageDataGenerator(preprocessing_function = preprocess_input)
  data_gen = data_gen.flow_from_directory( path, target_size=(row, column), batch_size = frame_rate)

  return data_gen

# *Fine tune the InceptionV3 model for image classsification*

In [None]:
def image_model(row, column, n_class):
  input_shape = (row, column, 3)

  ### Inception model
  base_model = applications.InceptionV3(weights='imagenet', include_top=False,  input_shape=(row, column,3))
  base_model.trainable = False

  add_model = Sequential()
  add_model.add(base_model)
  add_model.add(GlobalAveragePooling2D())
  add_model.add(Dropout(0.5))
  #add_model.add(Dense(1024, activation = 'relu'))
  #add_model.add(Dense(512, activation = 'relu'))
  #add_model.add(Dense(256, activation = 'relu'))
  add_model.add(Dense(n_class, activation='softmax'))

  model = add_model
  model.compile(loss='categorical_crossentropy', optimizer = 'Adam', metrics=['accuracy'])
  return model

# *Pre process the textual data*

In [None]:
### Pre process the textual data
def preprocess_text(text, stem = True, lem = True, stop_words = set(nltk.corpus.stopwords.words('english'))):
    
    ### Convert to lowercase, remove characters and punctuation and strip
    text = re.sub(r'[^\w\s]', '', str(text).lower().strip())
    
    ### Tokenize
    text = text.split()
    
    ### remove stop words
    text = [w for w in text if not w in stop_words] 
    
    ### Stemming
    if(stem):
        ps = nltk.stem.porter.PorterStemmer()
        text = [ps.stem(w) for w in text]
    ### Lemmatization
    if(lem):
        lemat = nltk.stem.wordnet.WordNetLemmatizer()
        text = [lemat.lemmatize(w) for w in text]
    
    ### Join back to text
    text = ' '.join(text)
    return text



# *Configure and setup the BERT model*

In [None]:
def build_bert_model():

  ### Name of the BERT model to use 
  model_name = 'bert-base-uncased'
  ### Max length of tokens
  max_length = 256

  ### Load transformers config and set output_hidden_states to False
  config = BertConfig.from_pretrained(model_name)
  config.output_hidden_states = False

  ### Load BERT tokenizer
  tokenizer = BertTokenizerFast.from_pretrained(pretrained_model_name_or_path = model_name, config = config)

  ### Load the Transformers BERT model
  transformer_model = TFBertModel.from_pretrained(model_name, config = config)

  ### Load the MainLayer
  bert = transformer_model.layers[0]
  bert.trainable = False
  ### Input layer
  input = Input(shape=(max_length,), dtype='int32')
  ### Load the Transformers BERT model as a layer in a Keras model
  bert_layer = bert(input)[1]
  x = Dropout(config.hidden_dropout_prob)(bert_layer)
  x = Dense(units = 32, activation = 'relu')(x)
  x = Dense(units = 64, activation = 'relu')(x)
  x = Dense(units = 128, activation = 'relu')(x)
  x = Dense(units = 256, activation = 'relu')(x)
  x = Dense(units = 512, activation = 'relu')(x)
  #x = Dense(units = 16, activation = 'relu')(x)
  output = Dense(units = 3, activation = 'softmax')(x)

  model = Model(inputs=input, outputs=output)
  model.compile(loss='categorical_crossentropy', optimizer = 'Adam', metrics=['accuracy'])
  return model, tokenizer

# *BERT tokenizer*

In [None]:
### Model building
def bert_tokenizer(data, tokenizer):
    
    tokens = tokenizer(
    text = data,
    add_special_tokens=True,
    max_length = 256,
    truncation=True,
    padding=True, 
    return_tensors='tf',
    return_token_type_ids = False,
    return_attention_mask = False,
    verbose = True)

    return tokens

# *Train the model (Image classification)*

In [None]:
def train_image_model():
  path = "/content/drive/My Drive/data"
  x_train = generate_data(50, path + "/train")
  x_val = generate_data(50, path + "/validation")

  image_model = image_model(360, 360, 3)
  image_model.fit_generator(
    x_train,
    epochs = 2,
    validation_data = x_val,   
    validation_steps = 1
    )

  # save entire model to HDF5 
  image_model.save("video.h5")

# *Preapre the data for text classification*

In [None]:
def text_prepare_build(categories):
  data = []
  classes = []
  translator = Translator()

  path = "/content/drive/My Drive/data"

  for file_name in os.listdir(path):
    if(file_name.find('.txt') != -1):
      with open(path + '/' + file_name, 'r') as f:
        obj = json.loads(f.read())
        ### Check if there is any audio
        if(obj["audio"] != ""):
            
            temp = obj["audio"]
            temp = temp.split('.')
            text = []

            ### Check if the language is english

            if(obj["language"] == "en"):
              text = [preprocess_text(i) for i in temp]
            else:
              ### Otherwise translate
              for sentence in temp:
                if(len(sentence) > 1):
                  text.append(translator.translate(sentence, src = 'hi').text)
              [preprocess_text(i) for i in text]

            for i in range(0,len(text)):
              classes.append(file_name[:file_name.find('_')])
            data.append(text)         

  ### Concatenate the lists and create the labels
  data = [sentence for sublist in data for sentence in sublist]
  classes = [categories.index(i) for i in classes]

  #classes = to_categorical(classes)

  x_train, x_val, y_train, y_val = train_test_split(data, classes, test_size = 0.1)

  #bert_model, tokenizer = build_bert_model()

  #x_train = bert_tokenizer(x_train, tokenizer)
  #x_val = bert_tokenizer(x_val, tokenizer)

  #bert_model.fit(
  #   x =  x_train['input_ids'], 
  #   y = y_train, 
  #   epochs = 25,
  #   batch_size = 16,
  #   validation_data = (x_val['input_ids'], y_val)
  #)

  ### Use TF-IDF vectorization

  tfidf_vectorizer = TfidfVectorizer(max_df=0.7)
  tfidf_train = tfidf_vectorizer.fit_transform(x_train) 
  tfidf_val = tfidf_vectorizer.transform(x_val)

  return tfidf_train, y_train, tfidf_val, y_val


# Function to transform the label data into one against many

In [None]:
def create_one_label(y_train, y_val, class_id):
    new_y_train = []
    new_y_val = []
    for i in range(0, len(y_train)):
      if(y_train[i] != class_id):
        new_y_train.append(0)
      else:
        new_y_train.append(1)
    for i in range(0, len(y_val)):
      if(y_val[i] != class_id):
        new_y_val.append(0)
      else:
        new_y_val.append(1)

    return new_y_train, new_y_val

# SVM train models for all the classes

In [None]:

def svm_models(categories):

    x_train, y_train, x_val, y_val = text_prepare(categories)
    
    for i in range(len(categories)):
      yc_train, yc_val = create_one_label(y_train, y_val, i)
      svm_classifier = SVC(kernel='rbf')
      svm_classifier.fit(x_train,yc_train)
      
      filename = categories[i] + '.sav'
      joblib.dump(svm_classifier, filename)
      
      #y_pred = svm_classifier.predict(x_val)
      #score=accuracy_score(yc_val, y_pred)
      #print(f'Accuracy: {round(score*100,2)}%')

svm_models(categories)

# Test new data

In [None]:
def test():
    url = input("Enter the url of YouTube video")
    path = "/content/drive/My drive/data/test"
    name = input("Enter name to be saved as")
    categories = ['Technology', 'Healthcare', 'Entertainment']
    generate_min_data(path, url, name)
    vid = cv2.VideoCapture(path + '/' + name +  ".mp4")
    text = []
    with open(path + '/' + file_name, 'r') as f:
        obj = json.loads(f.read())
        ### Check if there is any audio
        if(obj["audio"] != ""):
            
            temp = obj["audio"]
            temp = temp.split('.')

            ### Check if the language is english

            if(obj["language"] == "en"):
              text = [preprocess_text(i) for i in temp]
            else:
              ### Otherwise translate
              for sentence in temp:
                if(len(sentence) > 1):
                  text.append(translator.translate(sentence, src = 'hi').text)
              [preprocess_text(i) for i in text]

    text_score = -1
    if text:
      
      predicitions = []
      for i in categories:
        classifier = (joblib.load(i + '.sav'))
        tfidf_vectorizer = TfidfVectorizer(max_df=0.7)
        x_test = tfidf_vectorizer.fit_transform(text) 
        predictions.append(classifier.predict(x_test))

      test_score = max(predictions)
      print('text classification:', categories[int(test_score)])

    frame_rate = 60
    frame_path = path + '/frame'
    frames = video_to_frames(path, frame_path, name, frame_path + '/' + name, frame_rate)
    x_test = generate_data(frame_rate, frame_path +'/' +name)
    model.load('/content/drive/My Drive/data/video.h5')
    predicitions = model.predict(x_test)
    res = []
    for i in range(len(categories)):
      res.append(predicitions.count(i))

    print('Image classification:', categories[res.index(max(res))])
