In [3]:
import pandas as pd
import string
import numpy as np
import json

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import Sequential
import tensorflow.keras.utils as ku

import tensorflow as tf
tf.random.set_seed(2)
from numpy.random import seed
seed(1)


In [4]:
# Load all the datasets
df1 = pd.read_csv('USvideos.csv')
df2 = pd.read_csv('CAvideos.csv')
df3 = pd.read_csv('GBvideos.csv')

# Load the datasets containing the category names
data1 = json.load(open('US_category_id.json'))
data2 = json.load(open('CA_category_id.json'))
data3 = json.load(open('GB_category_id.json'))


In [5]:
def category_extractor(data):
    i_d = [data['items'][i]['id'] for i in range(len(data['items']))]
    title = [data['items'][i]['snippet']["title"] for i in range(len(data['items']))]
    i_d = list(map(int, i_d))
    category = zip(i_d, title)
    category = dict(category)
    return category

# Create a new category column by mapping the category names to their id
df1['category_title'] = df1['category_id'].map(category_extractor(data1))
df2['category_title'] = df2['category_id'].map(category_extractor(data2))
df3['category_title'] = df3['category_id'].map(category_extractor(data3))


In [6]:
# Join the dataframes
df = pd.concat([df1, df2, df3], ignore_index=True)

In [7]:
# Drop rows based on duplicate videos
df = df.drop_duplicates('video_id')

# Collect only titles of entertainment videos
# Feel free to use any category of video that you want
entertainment = df[df['category_title'] == 'Entertainment']['title']
entertainment = entertainment.tolist()

In [8]:
# Remove punctuations and convert text to lowercase
def clean_text(text):
    text = ''.join(e for e in text if e not in string.punctuation).lower()
    text = text.encode('utf8').decode('ascii', 'ignore')
    return text

corpus = [clean_text(e) for e in entertainment]

In [9]:
# Use a limited number of words to reduce memory usage
tokenizer = Tokenizer(num_words=10000)

In [10]:
def get_sequence_of_tokens(corpus):
    # Get tokens
    tokenizer.fit_on_texts(corpus)
    total_words = len(tokenizer.word_index) + 1

    # Convert to sequence of tokens
    input_sequences = []
    for line in corpus:
        token_list = tokenizer.texts_to_sequences([line])[0]
        for i in range(1, len(token_list)):
            n_gram_sequence = token_list[:i+1]
            input_sequences.append(n_gram_sequence)

    return input_sequences, total_words

inp_sequences, total_words = get_sequence_of_tokens(corpus)

In [11]:
def generate_padded_sequences(input_sequences):
    max_sequence_len = min(max([len(x) for x in input_sequences]), 20)  # Cap sequence length at 20
    input_sequences = np.array(pad_sequences(input_sequences, maxlen=max_sequence_len, padding='pre'))
    predictors, label = input_sequences[:, :-1], input_sequences[:, -1]
    label = ku.to_categorical(label, num_classes=total_words)
    return predictors, label, max_sequence_len

predictors, label, max_sequence_len = generate_padded_sequences(inp_sequences)


In [12]:
def create_model(max_sequence_len, total_words):
    input_len = max_sequence_len - 1
    model = Sequential()

    # Add Input Embedding Layer with a smaller dimension
    model.add(Embedding(total_words, 50))  # Smaller embedding dimension

    # Add Hidden Layer 1 — LSTM Layer with fewer units
    model.add(LSTM(50))
    model.add(Dropout(0.1))

    # Add Output Layer
    model.add(Dense(total_words, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam')

    return model

In [13]:
# Define DataGenerator class to load data in batches
from tensorflow.keras.utils import Sequence

class DataGenerator(Sequence):
    def __init__(self, predictors, labels, batch_size=32):
        self.predictors = predictors
        self.labels = labels
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.predictors) / self.batch_size))

    def __getitem__(self, idx):
        start = idx * self.batch_size
        end = (idx + 1) * self.batch_size
        batch_x = self.predictors[start:end]
        batch_y = self.labels[start:end]
        return batch_x, batch_y

In [14]:
# Create and train the model
model = create_model(max_sequence_len, total_words)


In [15]:
# Use a batch generator to save memory
batch_size = 64  # Adjust based on memory availability
data_gen = DataGenerator(predictors, label, batch_size=batch_size)
model.fit(data_gen, epochs=20, verbose=1)

Epoch 1/20
[1m   2/1041[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m56s[0m 54ms/step - loss: 9.5406

  self._warn_if_super_not_called()


[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m60s[0m 55ms/step - loss: 8.1421
Epoch 2/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 53ms/step - loss: 7.3015
Epoch 3/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 49ms/step - loss: 6.8845
Epoch 4/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 50ms/step - loss: 6.5352
Epoch 5/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 50ms/step - loss: 6.1669
Epoch 6/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 49ms/step - loss: 5.8152
Epoch 7/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 49ms/step - loss: 5.5295
Epoch 8/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 50ms/step - loss: 5.2489
Epoch 9/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 49ms/step - loss: 5.0448
Epoch 10/20
[1m1041/1041[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[

<keras.src.callbacks.history.History at 0x79316c4939a0>

In [16]:
def generate_text(seed_text, next_words, model, max_sequence_len):
    for _ in range(next_words):
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = pad_sequences([token_list], maxlen=max_sequence_len-1, padding='pre')
        predicted = model.predict(token_list, verbose=0)
        predicted = np.argmax(predicted, axis=-1)[0]

        output_word = ""
        for word, index in tokenizer.word_index.items():
            if index == predicted:
                output_word = word
                break
        seed_text += " " + output_word
    return seed_text.title()

In [17]:
# Generate text
print(generate_text("spiderman", 5, model, max_sequence_len))


Spiderman Of The Singer 2 Trailer
