<a href="https://colab.research.google.com/github/CarlHad333/Non_Verbal_Behaviour/blob/main/IEMOCAP_Text_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'iemocapfullrelease:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F1834494%2F2993857%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240329%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240329T141520Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D32145445c90bc8cf7d3d5db8e54dc090ea4d05aa0167c444f72e3a572b86652ded02da81a0578ec2400a9cc98c83760b8af36c13f1671c54631d40ce7634c0e9040d059221fda7ed2c0bc71e7c849072f1be7c4390202b63f757a9b48e0e84b80cf4d8f9b60629a8f47182abbb1ce22d1443ae0b50ea5648f5c7ffeb5b20057c6e417682d3642fc83ee45eac50924f4491d703ebd019f18e5be04734d3b304dfb3aecf68f02270e8c9bbf6aa1332d5db651ab9de95f6fc54cef3c112b93763ba462573d10c694665ddeb39ec5ff9be0afa7cf0a17acad7a7420a5d8547c2680080374315f636e4023236c3bad449d444720be62ff413ec240f8514abfce633fb'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
import os
import pandas
import numpy as np
!pip install --upgrade numpy


In [None]:
root_path = '/kaggle/input/iemocap-csv'

df = pandas.read_csv(os.path.join(root_path, 'iemocap.csv'))
sessions = [1, 2, 3, 4, 5]
df = df[df['session'].isin(sessions)]

# Remove unwanted emotions and empty values
unwanted_emotions = ['xxx', '', 'oth', 'dis', 'sur', 'fea', 'exc', 'fru']
df = df[~df['emotion'].isin(unwanted_emotions)]

# Calculate annotator difference
df['annotator_difference'] = df['n_annotators'] - df['agreement']

# Filter by annotator difference
df = df[df['annotator_difference'] <= 1]

# Replace 'exc' emotion with 'hap'
#df.loc[df['emotion'] == 'exc', 'emotion'] = 'hap'


emotions_count_before = df['emotion'].value_counts()
print("Emotions count before filtering:")
print(emotions_count_before)

# Group by emotion and select first 550 rows of each group
df = df.groupby('emotion').head(650)

# Count the occurrences of each emotion after filtering
emotions_count_after = df['emotion'].value_counts()
print("\nEmotions count after filtering:")
print(emotions_count_after)

# Display the first 5 rows
display(df)

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# First split: 80% train, 20% remaining
df_train, df_remaining = train_test_split(df, test_size=0.2, random_state=42)

# Second split: 50% valid, 50% test from the remaining 20%
df_valid, df_test = train_test_split(df_remaining, test_size=0.5, random_state=42)

# Print the shapes of the resulting dataframes
print("Shape of df_train:", df_train.shape)
print("Shape of df_valid:", df_valid.shape)
print("Shape of df_test:", df_test.shape)

In [None]:
# Define a function to split the wav_path into main_path and file_name and convert main_path to the desired format
def split_and_convert_path(wav_path):
    parts = wav_path.split('/')
    main_path_parts = parts[:-3]  # Extract main path parts
    session_part = main_path_parts[1]  # Extract the session part (e.g., 'Session1')
    script_part = parts[-2]  # Extract the script part (e.g., 'Ses01F_script02_1')
    main_path = f'IEMOCAP_full_release/{session_part}/dialog/transcriptions/{script_part}.txt'  # Construct the main path
    file_name = parts[-1]  # Extract file name
    return main_path, file_name

# Apply the split_and_convert_path function to the 'wav_path' column
df_train['main_path'], df_train['file_name'] = zip(*df_train['wav_path'].apply(split_and_convert_path))
df_valid['main_path'], df_valid['file_name'] = zip(*df_valid['wav_path'].apply(split_and_convert_path))
df_test['main_path'], df_test['file_name'] = zip(*df_test['wav_path'].apply(split_and_convert_path))

# Display the updated DataFrame
display(df_train[['main_path', 'file_name', 'emotion']])


In [None]:
import re

# Assuming your DataFrame is called 'df'
def get_transcript(row):
    file_path = row['main_path']
    wav_file = row['file_name']

    file_path = os.path.join('/kaggle/input/iemocapfullrelease',file_path)

    with open(file_path, 'r') as f:
        transcript = ''
        for line in f:
            match = re.match(r'(.*?)\s\[(.*?)\]:\s(.*)', line)
            if match:
                speaker_id = match.group(1)
                if speaker_id in wav_file:
                    transcript += match.group(3) + ' '

    return transcript.strip()

df_train['text'] = df_train.apply(get_transcript, axis=1)
df_test['text'] = df_test.apply(get_transcript, axis=1)
df_valid['text'] = df_valid.apply(get_transcript, axis=1)


In [None]:
display(df_train[['main_path', 'file_name', 'emotion', 'text']])

In [None]:
from datasets import Dataset, DatasetDict, Features, Value, ClassLabel

# Define the class names
class_names = ['ang', 'hap', 'neu', 'sad']  # Update with your actual class names

# Define the features
features = Features({
    'emotion': ClassLabel(names=class_names),
    'text': Value('string')
})

# Preprocess the 'emotion' column in the dataframes
df_train['emotion'] = df_train['emotion'].apply(lambda x: class_names.index(x))
df_valid['emotion'] = df_valid['emotion'].apply(lambda x: class_names.index(x))
df_test['emotion'] = df_test['emotion'].apply(lambda x: class_names.index(x))

# Create individual datasets from the dataframes
train_dataset = Dataset.from_pandas(df_train, features=features)
valid_dataset = Dataset.from_pandas(df_valid, features=features)
test_dataset = Dataset.from_pandas(df_test, features=features)

# Create the DatasetDict
emotions = DatasetDict({
    'train': train_dataset,
    'validation': valid_dataset,
    'test': test_dataset
})

In [None]:
emotions['train'][:5]

# ------------------------------------------------------------

In [None]:
df

In [None]:
from transformers import AutoTokenizer

text = 'Tokenisation of text is a core task of NLP.'

# Load parameters of the tokeniser
model_ckpt = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

# Show tokeniser information
tokenizer

In [None]:
print('Encoded text')
encoded_text = tokenizer(text)
print(encoded_text,'\n')

print('Tokens')
tokens = tokenizer.convert_ids_to_tokens(encoded_text.input_ids)
print(tokens,'\n')

print('Convert tokens to string')
print(tokenizer.convert_tokens_to_string(tokens),'\n')

In [None]:
emotions.reset_format()

In [None]:
emotions['train'][:5]

In [None]:
# Tokenisation function
def tokenise(batch):
    return tokenizer(batch["text"], padding=True, truncation=True)

In [None]:
emotions_encoded = emotions.map(tokenise, batched=True, batch_size=None)
print(emotions_encoded["train"].column_names)

In [None]:
#emotions_encoded['train'][:1]

In [None]:
emotions_encoded['train'][:1]['text']

In [None]:
text = emotions_encoded['train'][:1]['text']
print('Encoded text')
encoded_text = tokenizer(text)
print(encoded_text, '\n')

print('Tokens')
tokens = [tokenizer.convert_ids_to_tokens(ids) for ids in encoded_text.input_ids]
print(tokens, '\n')

print('Convert tokens to string')
print([tokenizer.convert_tokens_to_string(token) for token in tokens], '\n')

In [None]:
import warnings; warnings.filterwarnings('ignore')
from transformers import AutoModel
import torch

model_ckpt = "distilbert-base-uncased"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModel.from_pretrained(model_ckpt).to(device)

In [None]:
def extract_hidden_states(batch):
    # Encode text
    encoded_text = tokenizer(batch["text"], padding=True, truncation=True, return_tensors="pt")

    # Place model inputs on the GPU
    inputs = {k: v.to(device) for k, v in encoded_text.items()}

    # Extract last hidden states
    with torch.no_grad():
        last_hidden_state = model(**inputs).last_hidden_state

    # Return vector for [CLS] token
    return {"hidden_state": last_hidden_state[:, 0].cpu().numpy()}

# Extract last hidden states (faster w/ GPU)
emotions_hidden = emotions_encoded.map(extract_hidden_states, batched=True)
print(emotions_hidden["train"].column_names)

In [None]:
emotions_hidden['train']

In [None]:
X_train = np.array(emotions_hidden["train"]["hidden_state"])
X_valid = np.array(emotions_hidden["validation"]["hidden_state"])
y_train = np.array(emotions_hidden["train"]["emotion"])
y_valid = np.array(emotions_hidden["validation"]["emotion"])
print(f'Training Dataset: {X_train.shape}')
print(f'Validation Dataset {X_valid.shape}')

In [None]:
import warnings; warnings.filterwarnings('ignore')
from sklearn.preprocessing import MinMaxScaler
from sklearn.manifold import TSNE

# Scale the data
X_scaled = MinMaxScaler().fit_transform(X_train)

# lower dimension transformation
model = TSNE(n_components=2).fit(X_scaled)

# Create a df of 2D embeddings
df_embedding = pd.DataFrame(model.embedding_, columns=["X", "Y"])
df_embedding["emotion"] = y_train

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns; sns.set(style='whitegrid')

fig, axes = plt.subplots(2, 3, figsize=(15,10))
axes = axes.flatten()
labels = emotions["train"].features["emotion"].names

for i, label in enumerate(labels):

    dict_embedding_sub = dict(tuple(df_embedding.groupby('emotion')))
    df_embedding_sub = dict_embedding_sub[i]

    axes[i].scatter(df_embedding_sub["X"],
                    df_embedding_sub["Y"],
                    lw=1,ec='k',alpha=0.2)

    axes[i].set_title(f'{label}')

plt.tight_layout()
plt.show()

In [None]:
from sklearn.dummy import DummyClassifier

dummy_clf = DummyClassifier(strategy="most_frequent")
dummy_clf.fit(X_train, y_train)
print(f'accuracy: {dummy_clf.score(X_valid, y_valid)}')

In [None]:
from sklearn.linear_model import LogisticRegression as LR

# We increase `max_iter` to guarantee convergence
lr_clf = LR(max_iter=2000, random_state=42)
lr_clf.fit(X_train, y_train)
y_preds = lr_clf.predict(X_valid)
print(f'accuracy: {lr_clf.score(X_valid, y_valid)}')

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from xgboost import XGBClassifier


# Random Forest Classifier
rf_clf = RandomForestClassifier(n_estimators=116, random_state=42)
rf_clf.fit(X_train, y_train)
rf_predictions = rf_clf.predict(X_valid)
rf_accuracy = accuracy_score(y_valid, rf_predictions)
print("Random Forest Accuracy:", rf_accuracy)
print("Random Forest Classification Report:")
print(classification_report(y_valid, rf_predictions))

In [None]:
# XGBoost Classifier
xgb_clf = XGBClassifier(n_estimators=250, random_state=42)
xgb_clf.fit(X_train, y_train)
xgb_predictions = xgb_clf.predict(X_valid)
xgb_accuracy = accuracy_score(y_valid, xgb_predictions)
print("XGBoost Accuracy:", xgb_accuracy)
print("XGBoost Classification Report:")
print(classification_report(y_valid, xgb_predictions))