In [1]:
crime_classes = ['Abuse', 'Arrest', 'Arson', 'Assault', 'RoadAccidents', 'Burglary', 'Explosion', 'Fighting', 'Robbery', 'Shooting', 'Stealing', 'Shoplifting', 'Vandalism', 'Normal_Videos']

In [2]:
def preprocess_text_annotations(annotation_file):
    video_annotations = {}
    with open(annotation_file, 'r') as f:
        lines = f.readlines()
    for line in lines:
        parts = line.strip().split(' ')
        video_id = parts[0]
        start_time = parts[1]
        end_time = parts[2]
        description = ' '.join(parts[3:]).replace('##', '')  
        if video_id not in video_annotations:
            video_annotations[video_id] = ''
        video_annotations[video_id] += " " + description
    return video_annotations

In [3]:
train_ants = preprocess_text_annotations('/kaggle/input/annotations-formatted/Formatted_annotations/formatted_UCFCrime_Train.txt')
test_ants =  preprocess_text_annotations('/kaggle/input/annotations-formatted/Formatted_annotations/formatted_UCFCrime_Test.txt')
val_ants =  preprocess_text_annotations('/kaggle/input/annotations-formatted/Formatted_annotations/formatted_UCFCrime_Val.txt')

In [4]:
i3d_features_train = {}
i3d_features_val = {}
i3d_features_test = {}


In [5]:
import gc 
gc.collect()

11

In [6]:
import os
import numpy as np

base_dir = "/kaggle/input/ucf-crime01/UCF-Crime/all_rgbs"
i3d_features_dict = {}

for class_name in crime_classes:
    class_dir = os.path.join(base_dir, class_name)
    for file_name in os.listdir(class_dir):
        if file_name.endswith('.npy'):
            video_id = file_name.split('.')[0]
            i3d_features_file = os.path.join(class_dir, file_name)
            i3d_features = np.load(i3d_features_file)
            i3d_features_dict[video_id] = i3d_features


i3d_features_train = {}
i3d_features_val = {}
i3d_features_test = {}


for video_id, features in i3d_features_dict.items():
    if video_id in train_ants:
        i3d_features_train[video_id] = features
    elif video_id in val_ants:
        i3d_features_val[video_id] = features
    elif video_id in test_ants:
        i3d_features_test[video_id] = features


print(f'Number of training samples: {len(i3d_features_train)}')
print(f'Number of validation samples: {len(i3d_features_val)}')
print(f'Number of test samples: {len(i3d_features_test)}')


Number of training samples: 1165
Number of validation samples: 379
Number of test samples: 310


In [7]:
import gc
gc.collect()

627

In [9]:
import pandas as pd 

def populate_data(annotation_dict,  i3d_dict):
    data_list = []
    for video_id, annotations_text in annotation_dict.items():
        video_class = next((c for c in crime_classes if video_id.startswith(c)), None)
        data_list.append({
                'video_id': video_id,
                'i3d_features': i3d_dict[video_id],
                'annotations': annotations_text,
                'video_class': video_class
            })
    return data_list



train_data = populate_data(train_ants, i3d_features_train)
val_data = populate_data(val_ants, i3d_features_val)
test_data = populate_data(test_ants, i3d_features_test)


train_df = pd.DataFrame(train_data)
val_df = pd.DataFrame(val_data)
test_df = pd.DataFrame(test_data)

In [10]:
train_df['video_class'].unique()

array(['Abuse', 'Arrest', 'Arson', 'Assault', 'Burglary', 'Explosion',
       'Fighting', 'RoadAccidents', 'Robbery', 'Shooting', 'Shoplifting',
       'Stealing', 'Vandalism', 'Normal_Videos'], dtype=object)

In [11]:
import gc
gc.collect()

482

In [12]:
train_classes_raw = train_df["video_class"]
train_ants= train_df["annotations"]

val_classes_raw = val_df["video_class"]
val_ants = val_df["annotations"]

test_classes_raw = test_df["video_class"]
test_ants = test_df["annotations"]

In [13]:

from sklearn import preprocessing
import tensorflow as tf

label_encoder = preprocessing.LabelEncoder()

label_encoder.fit(crime_classes)
train_classes = label_encoder.transform(train_classes_raw)
train_classes = tf.keras.utils.to_categorical(train_classes, 14)

val_classes = label_encoder.transform(val_classes_raw)
val_classes = tf.keras.utils.to_categorical(val_classes, 14)

test_classes = label_encoder.transform(test_classes_raw)
test_classes = tf.keras.utils.to_categorical(test_classes, 14)

print("Train, Validation, and Test classes have been encoded.")

2024-08-09 17:36:21.722578: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-09 17:36:21.722726: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-09 17:36:21.877119: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Train, Validation, and Test classes have been encoded.


In [14]:
class_labels = label_encoder.classes_

In [15]:
import gc
gc.collect()

53

In [16]:
import string
import re
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer


def remove_punctuation(text):
    return text.translate(str.maketrans('', '', string.punctuation))

def clean_whitespace(text):
    text = text.strip()
    text = re.sub(r'\s+', ' ', text)
    return text

def remove_special_characters(text):
    return re.sub(r'[^\w\s]', '', text)

def remove_stop_words(text):
    stop_words = set(stopwords.words('english'))
    return ' '.join(word for word in text.split() if word not in stop_words)

def stem_text(text):
    stemmer = PorterStemmer()
    return ' '.join(stemmer.stem(word) for word in text.split())

In [17]:
train_ants = train_ants.apply(lambda x: clean_whitespace(remove_special_characters(remove_punctuation(x.lower()))))
test_ants = test_ants.apply(lambda x: clean_whitespace(remove_special_characters(remove_punctuation(x.lower()))))
val_ants = val_ants.apply(lambda x: clean_whitespace(remove_special_characters(remove_punctuation(x.lower()))))


train_ants = train_ants.apply(remove_stop_words).apply(stem_text)
test_ants = test_ants.apply(remove_stop_words).apply(stem_text)
val_ants = val_ants.apply(remove_stop_words).apply(stem_text)

In [18]:
max_sequence_length = max(train_ants.apply(lambda x: len(x.split())).max(), test_ants.apply(lambda x: len(x.split())).max(), val_ants.apply(lambda x: len(x.split())).max())

In [19]:
max_sequence_length

11362

In [20]:
all_messages = pd.concat([train_ants, test_ants, val_ants])

In [21]:
len(all_messages)

1854

In [22]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

MAX_NUM_WORDS = 12000
MAX_SEQUENCE_LENGTH = 11362  
tokenizer = Tokenizer(num_words=MAX_NUM_WORDS)
tokenizer.fit_on_texts(all_messages)


train_sequences = tokenizer.texts_to_sequences(train_ants)
val_sequences = tokenizer.texts_to_sequences(val_ants)
test_sequences = tokenizer.texts_to_sequences(test_ants)


train_data_padded = pad_sequences(train_sequences, maxlen=MAX_SEQUENCE_LENGTH)
val_data_padded = pad_sequences(val_sequences, maxlen=MAX_SEQUENCE_LENGTH)
test_data_padded = pad_sequences(test_sequences, maxlen=MAX_SEQUENCE_LENGTH)

In [23]:
import numpy as np

glove_dict = {} 
with open('/kaggle/input/glove01/glove.6B.50d.txt', "r", encoding="utf8") as glove_file:     
    for line in glove_file:
        
        emb_line = line.split()      
        emb_token = emb_line[0]         
        emb_vector = np.array(emb_line[1:], dtype=np.float32)
        
        if emb_vector.shape[0] == 50:    
            glove_dict[emb_token] = emb_vector 

print("Dictionary Size: ", len(glove_dict))

Dictionary Size:  400000


In [24]:
vocab_len = len(tokenizer.word_index) + 1

embedding_matrix = np.zeros((vocab_len, 50))

for word, id in tokenizer.word_index.items():  
    try:
        embedding_vector = glove_dict.get(word) 
        if embedding_vector is not None:         
            embedding_matrix[id] = embedding_vector
    except:
        pass

print("Size of Embedding matrix :", embedding_matrix.shape)

Size of Embedding matrix : (2512, 50)


In [25]:
train_i3d_features = np.array(train_df['i3d_features'].tolist()) 
val_i3d_features = np.array(val_df['i3d_features'].tolist())
test_i3d_features = np.array(test_df['i3d_features'].tolist())

In [26]:
train_i3d_features.shape

(1165, 32, 1024)

In [27]:
train_data_padded[0].shape

(11362,)

In [28]:
import gc
gc.collect()

0

In [29]:
import tensorflow as tf
from tensorflow.keras import layers, models, Input

I3D_FEATURES_SHAPE = (32, 1024)  
TEXT_INPUT_SHAPE = (MAX_SEQUENCE_LENGTH,)
NB_CLASSES = 14

i3d_input = Input(shape=I3D_FEATURES_SHAPE, name='I3D-Input')
x = layers.Flatten()(i3d_input)  
x = layers.Dense(512, activation='relu')(x)
x = layers.Dropout(0.5)(x)
x = layers.Dense(256, activation='relu')(x)


text_input = Input(shape=TEXT_INPUT_SHAPE, name='Text-Input')
embedding_layer = layers.Embedding(input_dim=vocab_len,
                                   output_dim=50,
                                   weights=[embedding_matrix],
                                   input_length=MAX_SEQUENCE_LENGTH,
                                   trainable=True)(text_input)
lstm_out = layers.LSTM(256)(embedding_layer)
lstm_out = layers.Flatten()(lstm_out)


combined = layers.concatenate([x, lstm_out])


dense_out = layers.Dense(512, activation='relu')(combined)
dense_out = layers.Dropout(0.5)(dense_out)
output = layers.Dense(NB_CLASSES, activation='softmax', name='Output-Layer')(dense_out)
model = models.Model(inputs=[text_input, i3d_input], outputs=output)


model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])


model.summary()



In [None]:

BATCH_SIZE = 128
EPOCHS = 10
VERBOSE = 1

print("\nTraining Progress:\n------------------------------------")

history = model.fit([train_data_padded, train_i3d_features],
                    train_classes,
                    batch_size=BATCH_SIZE,
                    epochs=EPOCHS,
                    verbose=VERBOSE,
                    validation_data=([val_data_padded, val_i3d_features], val_classes))




Training Progress:
------------------------------------
Epoch 1/10
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m534s[0m 53s/step - accuracy: 0.3718 - loss: 2.1869 - val_accuracy: 0.5726 - val_loss: 1.8024
Epoch 2/10
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m524s[0m 53s/step - accuracy: 0.4892 - loss: 1.8671 - val_accuracy: 0.5620 - val_loss: 1.5289
Epoch 3/10
[1m 6/10[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m3:21[0m 50s/step - accuracy: 0.5166 - loss: 1.6266

In [None]:
print("\nEvaluation against Test Dataset:\n------------------------------------")
model.evaluate([test_data_padded, test_i3d_features], test_classes)

In [None]:
import gc
gc.collect()

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig("/kaggle/working/rgb_lstm_accuracy.png")
plt.show()


plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig("/kaggle/working/rgb_lstm_loss.png")
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc, roc_auc_score

NB_CLASSES = 14

y_prob = model.predict([test_data_padded, test_i3d_features])

print("y_test_bin shape:", test_classes.shape)
print("y_prob shape:", y_prob.shape)        


fpr = dict()
tpr = dict()
roc_auc = dict()

class_labels = label_encoder.classes_


for i in range(NB_CLASSES):
    fpr[i], tpr[i], _ = roc_curve(test_classes[:, i], y_prob[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    print(f'Class {class_labels[i]} - AUC: {roc_auc[i]:.2f}') 
plt.figure(figsize=(12, 8))
for i in range(NB_CLASSES):
    plt.plot(fpr[i], tpr[i], label=f'ROC curve (class {class_labels[i]}) (AUC = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve for each class')


macro_auc = np.mean(list(roc_auc.values()))
y_prob_flat = y_prob.ravel()
test_classes_flat = test_classes.ravel()
micro_auc = roc_auc_score(test_classes_flat, y_prob_flat, average='micro')


plt.text(0.6, 0.1, f'Macro-average AUC: {macro_auc:.3f}\nMicro-average AUC: {micro_auc:.3f}', fontsize=12, ha='center')
plt.legend(loc="lower right")


plt.savefig("/kaggle/working/rgb_lstm_ROC.png")
plt.show()

In [None]:
os.listdir('/kaggle/working')

In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/rgb_lstm_ROC.png')

In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/rgb_lstm_accuracy.png')


In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/rgb_lstm_loss.png')