In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Load dataset
df = pd.read_csv("/content/audio_features_whisper.csv")

# Extract labels
y = df["Sarcasm"].values  # 0 = No sarcasm, 1 = Sarcasm

# Extract features
X_context = df[[col for col in df.columns if col.startswith("audio_c_feature_")]].values
X_utterance = df[[col for col in df.columns if col.startswith("audio_u_feature_")]].values

# Normalize the features
scaler = StandardScaler()
X_context = scaler.fit_transform(X_context)
X_utterance = scaler.fit_transform(X_utterance)

# Convert to NumPy arrays
X_context = np.array(X_context, dtype=np.float32)
X_utterance = np.array(X_utterance, dtype=np.float32)
y = np.array(y, dtype=np.float32)

# Train-test split
Xc_train, Xc_test, Xu_train, Xu_test, y_train, y_test = train_test_split(
    X_context, X_utterance, y, test_size=0.2, random_state=42, stratify=y
)

# Dynamically set input dimensions
input_dim = Xc_train.shape[1]

# Context Branch (Fully Connected)
input_context = keras.Input(shape=(input_dim,))
context_branch = layers.Dense(512, activation="relu")(input_context)
context_branch = layers.BatchNormalization()(context_branch)
context_branch = layers.Dense(256, activation="relu")(context_branch)
context_branch = layers.BatchNormalization()(context_branch)
context_branch = layers.Dense(128, activation="relu")(context_branch)
context_branch = layers.Dropout(0.4)(context_branch)

# Utterance Branch (Fully Connected)
input_utterance = keras.Input(shape=(input_dim,))
utterance_branch = layers.Dense(512, activation="relu")(input_utterance)
utterance_branch = layers.BatchNormalization()(utterance_branch)
utterance_branch = layers.Dense(256, activation="relu")(utterance_branch)
utterance_branch = layers.BatchNormalization()(utterance_branch)
utterance_branch = layers.Dense(128, activation="relu")(utterance_branch)
utterance_branch = layers.Dropout(0.4)(utterance_branch)

# Merge both branches
merged = layers.Concatenate()([context_branch, utterance_branch])
merged = layers.Dense(256, activation="relu")(merged)
merged = layers.Dropout(0.3)(merged)
merged = layers.Dense(128, activation="relu")(merged)
merged = layers.Dropout(0.3)(merged)
merged = layers.Dense(64, activation="relu")(merged)
merged = layers.Dropout(0.2)(merged)
output = layers.Dense(1, activation="sigmoid")(merged)  # Binary classification

# Define Model
model = keras.Model(inputs=[input_context, input_utterance], outputs=output)
model.summary()

# Compile Model with Lower Learning Rate
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="binary_crossentropy",
              metrics=["accuracy"])

# Callback to save the best model
checkpoint_path = "best_model.h5"
checkpoint = keras.callbacks.ModelCheckpoint(
    checkpoint_path, monitor="val_accuracy", save_best_only=True, mode="max", verbose=1
)

# Train Model
history = model.fit(
    [Xc_train, Xu_train], y_train,
    epochs=20, batch_size=32,
    validation_data=([Xc_test, Xu_test], y_test),
    callbacks=[checkpoint]
)

# Load the best model before making predictions
best_model = keras.models.load_model(checkpoint_path)

# Predictions using the best model
y_train_pred = (best_model.predict([Xc_train, Xu_train]) > 0.5).astype(int)
y_test_pred = (best_model.predict([Xc_test, Xu_test]) > 0.5).astype(int)

# Print Accuracy
train_acc = accuracy_score(y_train, y_train_pred)
test_acc = accuracy_score(y_test, y_test_pred)

print(f"\n✅ Train Accuracy: {train_acc:.4f}")
print(f"✅ Test Accuracy: {test_acc:.4f}")

# Print Classification Reports
def format_classification_report(report):
    lines = report.split("\n")
    formatted_lines = []
    for line in lines:
        parts = line.split()
        if len(parts) == 5 and parts[0].replace('.', '', 1).isdigit():  # Ensures first part is a number (class label)
            formatted_line = f"{parts[0]:<10} {float(parts[1]):.4f} {float(parts[2]):.4f} {float(parts[3]):.4f} {int(parts[4])}"
            formatted_lines.append(formatted_line)
        else:
            formatted_lines.append(line)
    return "\n".join(formatted_lines)

train_report = classification_report(y_train, y_train_pred, digits=4)
test_report = classification_report(y_test, y_test_pred, digits=4)

print("\nTrain Set Classification Report:\n", format_classification_report(train_report))
print("Test Set Classification Report:\n", format_classification_report(test_report))


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Load dataset
df = pd.read_csv("/content/audio_features_whisper.csv")

# Extract labels
y = df["Sarcasm"].values  # 0 = No sarcasm, 1 = Sarcasm

# Extract features
X_context = df[[col for col in df.columns if col.startswith("audio_c_feature_")]].values
X_utterance = df[[col for col in df.columns if col.startswith("audio_u_feature_")]].values

# Normalize the features
scaler = StandardScaler()
X_context = scaler.fit_transform(X_context)
X_utterance = scaler.fit_transform(X_utterance)

# Convert to NumPy arrays
X_context = np.array(X_context, dtype=np.float32)
X_utterance = np.array(X_utterance, dtype=np.float32)
y = np.array(y, dtype=np.float32)

# Train-test split
Xc_train, Xc_test, Xu_train, Xu_test, y_train, y_test = train_test_split(
    X_context, X_utterance, y, test_size=0.2, random_state=42, stratify=y
)

# Dynamically set input dimensions
input_dim = Xc_train.shape[1]

# Context Branch
input_context = keras.Input(shape=(input_dim,))
context_branch = layers.Reshape((input_dim, 1))(input_context)
context_branch = layers.Conv1D(filters=128, kernel_size=5, activation="relu")(context_branch)
context_branch = layers.BatchNormalization()(context_branch)
context_branch = layers.Conv1D(filters=64, kernel_size=3, activation="relu")(context_branch)
context_branch = layers.MaxPooling1D(pool_size=2)(context_branch)
context_branch = layers.Flatten()(context_branch)

# Utterance Branch
input_utterance = keras.Input(shape=(input_dim,))
utterance_branch = layers.Reshape((input_dim, 1))(input_utterance)
utterance_branch = layers.Conv1D(filters=128, kernel_size=5, activation="relu")(utterance_branch)
utterance_branch = layers.BatchNormalization()(utterance_branch)
utterance_branch = layers.Conv1D(filters=64, kernel_size=3, activation="relu")(utterance_branch)
utterance_branch = layers.MaxPooling1D(pool_size=2)(utterance_branch)
utterance_branch = layers.Flatten()(utterance_branch)

# Merge both branches
merged = layers.Concatenate()([context_branch, utterance_branch])
merged = layers.Dense(128, activation="relu")(merged)
merged = layers.Dropout(0.3)(merged)  # Dropout to prevent overfitting
merged = layers.Dense(64, activation="relu")(merged)
merged = layers.Dropout(0.2)(merged)
output = layers.Dense(1, activation="sigmoid")(merged)  # Binary classification

# Define Model
model = keras.Model(inputs=[input_context, input_utterance], outputs=output)
model.summary()

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="binary_crossentropy",
              metrics=["accuracy"])

# Callback to save the best model
checkpoint_path = "best_model.h5"
checkpoint = keras.callbacks.ModelCheckpoint(
    checkpoint_path, monitor="val_accuracy", save_best_only=True, mode="max", verbose=1
)

# Train Model
history = model.fit(
    [Xc_train, Xu_train], y_train,
    epochs=20, batch_size=32,
    validation_data=([Xc_test, Xu_test], y_test),
    callbacks=[checkpoint]
)

# Load the best model before making predictions
best_model = keras.models.load_model(checkpoint_path)

# Predictions using the best model
y_train_pred = (best_model.predict([Xc_train, Xu_train]) > 0.5).astype(int)
y_test_pred = (best_model.predict([Xc_test, Xu_test]) > 0.5).astype(int)

# Print Accuracy
train_acc = accuracy_score(y_train, y_train_pred)
test_acc = accuracy_score(y_test, y_test_pred)

print(f"\n✅ Train Accuracy: {train_acc:.4f}")
print(f"✅ Test Accuracy: {test_acc:.4f}")

# Find the best epoch and its validation accuracy
best_epoch = np.argmax(history.history["val_accuracy"]) + 1
best_val_acc = max(history.history["val_accuracy"])

print(f"🏆 Best Validation Accuracy: {best_val_acc:.4f} at Epoch {best_epoch}")

# Print Classification Reports
def format_classification_report(report):
    lines = report.split("\n")
    formatted_lines = []
    for line in lines:
        parts = line.split()
        if len(parts) == 5 and parts[0].replace('.', '', 1).isdigit():  # Ensures first part is a number (class label)
            formatted_line = f"{parts[0]:<10} {float(parts[1]):.4f} {float(parts[2]):.4f} {float(parts[3]):.4f} {int(parts[4])}"
            formatted_lines.append(formatted_line)
        else:
            formatted_lines.append(line)
    return "\n".join(formatted_lines)

train_report = classification_report(y_train, y_train_pred, digits=4)
test_report = classification_report(y_test, y_test_pred, digits=4)

print("\nTrain Set Classification Report:\n", format_classification_report(train_report))
print("Test Set Classification Report:\n", format_classification_report(test_report))


In [None]:
"""
each branch gets a separate cnn
"""

from tensorflow.keras.callbacks import ModelCheckpoint
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import accuracy_score

# Load CSV Files
df1 = pd.read_csv("/content/audio_features_langb.csv")  # LB dataset
df2 = pd.read_csv("/content/audio_features_whisper.csv")  # MMS dataset

# Extract Labels (assuming both datasets have the same labels)
y = df1["Sarcasm"].values

# Extract features from both datasets
Xc1 = df1[[col for col in df1.columns if col.startswith("audio_c_feature_")]].values
Xu1 = df1[[col for col in df1.columns if col.startswith("audio_u_feature_")]].values
Xc2 = df2[[col for col in df2.columns if col.startswith("audio_c_feature_")]].values
Xu2 = df2[[col for col in df2.columns if col.startswith("audio_u_feature_")]].values

# Convert to NumPy arrays
Xc1, Xu1, Xc2, Xu2 = map(lambda x: np.array(x, dtype=np.float32), [Xc1, Xu1, Xc2, Xu2])
y = np.array(y, dtype=np.float32)

# # Train-test split (70%-30%)
# Xc1_train, Xc1_temp, Xu1_train, Xu1_temp, Xc2_train, Xc2_temp, Xu2_train, Xu2_temp, y_train, y_temp = train_test_split(
#     Xc1, Xu1, Xc2, Xu2, y, test_size=0.3, random_state=42, stratify=y
# )
Xc1_train, Xc1_temp, Xu1_train, Xu1_temp, y_train, y_temp = train_test_split(
    Xc1, Xu1, y, test_size=0.3, random_state=42, stratify=y
)

Xc1_val, Xc1_test, Xu1_val, Xu1_test, y_val, y_test = train_test_split(
    Xc1_temp, Xu1_temp, y_temp, test_size=2/3, random_state=42, stratify=y_temp
)
Xc2_train, Xc2_temp, Xu2_train, Xu2_temp = train_test_split(
    Xc2, Xu2, test_size=0.3, random_state=42
)

Xc2_val, Xc2_test, Xu2_val, Xu2_test = train_test_split(
    Xc2_temp, Xu2_temp, test_size=2/3, random_state=42
)


# Further split temp set into validation (10%) and test (20%)
Xc1_val, Xc1_test, Xu1_val, Xu1_test, Xc2_val, Xc2_test, Xu2_val, Xu2_test, y_val, y_test = train_test_split(
    Xc1_temp, Xu1_temp, Xc2_temp, Xu2_temp, y_temp, test_size=2/3, random_state=42, stratify=y_temp
)

input_dim_lb = Xc1.shape[1]
input_dim_mms = Xc2.shape[1]


# CNN Model for feature extraction
def build_cnn_branch(input_dim):
    inp = keras.Input(shape=(input_dim,))
    x = layers.Reshape((input_dim, 1))(inp)
    x = layers.Conv1D(filters=126, kernel_size=3, activation="relu")(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Conv1D(filters=64, kernel_size=3, activation="relu")(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Flatten()(x)
    return inp, x

# Apply CNN on LB inputs (768)
input_c1, context_cnn1 = build_cnn_branch(input_dim_lb)
input_u1, utterance_cnn1 = build_cnn_branch(input_dim_lb)

# Apply CNN on MMS inputs (1280)
input_c2, context_cnn2 = build_cnn_branch(input_dim_mms)
input_u2, utterance_cnn2 = build_cnn_branch(input_dim_mms)

# First fusion (context_cnn1 + utterance_cnn1) and (context_cnn2 + utterance_cnn2)
fused_1 = layers.Concatenate()([context_cnn1, utterance_cnn1])
fused_2 = layers.Concatenate()([context_cnn2, utterance_cnn2])

# Apply another CNN on fused representations
def build_fused_cnn(input_tensor):
    x = layers.Reshape((-1, 1))(input_tensor)
    x = layers.Conv1D(filters=64, kernel_size=3, activation="relu")(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Conv1D(filters=32, kernel_size=3, activation="relu")(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Flatten()(x)
    return x

def build_fused_fcn(input_tensor):
    x = layers.Dense(128, activation="swish")(input_tensor)
    x = layers.Dense(64, activation="swish")(x)
    x = layers.Dense(32, activation="swish")(x)
    return x

cnn1 = build_fused_fcn(fused_1)
cnn2 = build_fused_fcn(fused_2)

# Final fusion (cnn1 + cnn2)
final_fusion = layers.Concatenate()([cnn1, cnn2])

# Fully connected layers
fc = layers.Dense(64, activation="swish")(final_fusion)
fc = layers.Dense(32, activation="swish")(fc)
output = layers.Dense(1, activation="sigmoid")(fc)

# Define and compile model
model = keras.Model(inputs=[input_c1, input_u1, input_c2, input_u2], outputs=output)
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

# Model checkpoint
checkpoint_callback = ModelCheckpoint(
    "/kaggle/working/lb+mms_cnn.weights.h5",
    monitor="val_accuracy",
    mode="max",
    save_best_only=True,
    save_weights_only=True,
    verbose=1
)

# Train Model
model.fit(
    [Xc1_train, Xu1_train, Xc2_train, Xu2_train], y_train,
    epochs=25, batch_size=32,
    validation_data=([Xc1_val, Xu1_val, Xc2_val, Xu2_val], y_val),
    callbacks=[checkpoint_callback]
)

# Load best weights
model.load_weights("/kaggle/working/lb+mms_cnn.weights.h5")
print("Loaded Best Model Weights.")

# Predictions
y_train_pred = (model.predict([Xc1_train, Xu1_train, Xc2_train, Xu2_train]) > 0.5).astype(int)
y_val_pred = (model.predict([Xc1_val, Xu1_val, Xc2_val, Xu2_val]) > 0.5).astype(int)
y_test_pred = (model.predict([Xc1_test, Xu1_test, Xc2_test, Xu2_test]) > 0.5).astype(int)

# Classification Reports and Accuracy
print("Train Set Classification Report:\n", classification_report(y_train, y_train_pred, digits=4))
print("Train Accuracy:", accuracy_score(y_train, y_train_pred))

print("\nValidation Set Classification Report:\n", classification_report(y_val, y_val_pred, digits=4))
print("Validation Accuracy:", accuracy_score(y_val, y_val_pred))

print("\nTest Set Classification Report:\n", classification_report(y_test, y_test_pred, digits=4))
print("Test Accuracy:", accuracy_score(y_test, y_test_pred))

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import accuracy_score
# Load CSV Files
df1 = pd.read_csv("/content/audio_features_langb.csv")  # LB dataset
df2 = pd.read_csv("/content/audio_features_whisper.csv")  # MMS dataset

# Extract Labels
y = df1["Sarcasm"].values

# Extract features from both datasets
Xc1 = df1[[col for col in df1.columns if col.startswith("audio_c_feature_")]].values
Xu1 = df1[[col for col in df1.columns if col.startswith("audio_u_feature_")]].values
Xc2 = df2[[col for col in df2.columns if col.startswith("audio_c_feature_")]].values
Xu2 = df2[[col for col in df2.columns if col.startswith("audio_u_feature_")]].values

# Convert to NumPy arrays
Xc1, Xu1, Xc2, Xu2 = map(lambda x: np.array(x, dtype=np.float32), [Xc1, Xu1, Xc2, Xu2])
y = np.array(y, dtype=np.float32)

Xc1_train, Xc1_temp, Xu1_train, Xu1_temp, y_train, y_temp = train_test_split(
    Xc1, Xu1, y, test_size=0.3, random_state=42, stratify=y
)

Xc1_val, Xc1_test, Xu1_val, Xu1_test, y_val, y_test = train_test_split(
    Xc1_temp, Xu1_temp, y_temp, test_size=2/3, random_state=42, stratify=y_temp
)
Xc2_train, Xc2_temp, Xu2_train, Xu2_temp = train_test_split(
    Xc2, Xu2, test_size=0.3, random_state=42
)

Xc2_val, Xc2_test, Xu2_val, Xu2_test = train_test_split(
    Xc2_temp, Xu2_temp, test_size=2/3, random_state=42
)

input_dim_lb = Xc1.shape[1]
input_dim_mms = Xc2.shape[1]  # MMS Feature dimension

# Fully connected network branch
def build_fcn_branch(input_dim):
    inp = keras.Input(shape=(input_dim,))
    x = layers.Dense(512, activation="swish")(inp)
    x = layers.Dense(256, activation="swish")(x)
    x = layers.Dense(128, activation="swish")(x)
    return inp, x

# FCN for LB dataset
input_c1, context_fcn1 = build_fcn_branch(input_dim_lb)
input_u1, utterance_fcn1 = build_fcn_branch(input_dim_lb)

# FCN for MMS dataset
input_c2, context_fcn2 = build_fcn_branch(input_dim_mms)
input_u2, utterance_fcn2 = build_fcn_branch(input_dim_mms)

# First fusion
fused_1 = layers.Concatenate()([context_fcn1, utterance_fcn1])
fused_2 = layers.Concatenate()([context_fcn2, utterance_fcn2])

# Additional FCN model after fusion
def build_fcn_model(input_tensor):
    x = layers.Dense(512, activation="swish")(input_tensor)
    x = layers.Dense(256, activation="swish")(x)
    x = layers.Dense(128, activation="swish")(x)
    return x

fcn1 = build_fcn_model(fused_1)
fcn2 = build_fcn_model(fused_2)

# Final fusion
final_fusion = layers.Concatenate()([fcn1, fcn2])

# Fully connected layers
fc = layers.Dense(64, activation="swish")(final_fusion)
fc = layers.Dense(32, activation="swish")(fc)
fc = layers.Dense(8, activation="swish")(fc)
output = layers.Dense(1, activation="sigmoid")(fc)

# Define and compile model
model = keras.Model(inputs=[input_c1, input_u1, input_c2, input_u2], outputs=output)
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

# Model checkpoint
checkpoint_callback = ModelCheckpoint(
    "/kaggle/working/lb+mms_fcn.weights.h5",
    monitor="val_accuracy",
    mode="max",
    save_best_only=True,
    save_weights_only=True,
    verbose=1
)

# Train Model
model.fit(
    [Xc1_train, Xu1_train, Xc2_train, Xu2_train], y_train,
    epochs=25, batch_size=32,
    validation_data=([Xc1_val, Xu1_val, Xc2_val, Xu2_val], y_val),
    callbacks=[checkpoint_callback]
)

# Load best weights
model.load_weights("/kaggle/working/lb+mms_fcn.weights.h5")
print("Loaded Best Model Weights.")

# Predictions
y_train_pred = (model.predict([Xc1_train, Xu1_train, Xc2_train, Xu2_train]) > 0.5).astype(int)
y_val_pred = (model.predict([Xc1_val, Xu1_val, Xc2_val, Xu2_val]) > 0.5).astype(int)
y_test_pred = (model.predict([Xc1_test, Xu1_test, Xc2_test, Xu2_test]) > 0.5).astype(int)

# Classification Reports and Accuracy
print("Train Set Classification Report:\n", classification_report(y_train, y_train_pred, digits=4))
print("Train Accuracy:", accuracy_score(y_train, y_train_pred))

print("\nValidation Set Classification Report:\n", classification_report(y_val, y_val_pred, digits=4))
print("Validation Accuracy:", accuracy_score(y_val, y_val_pred))

print("\nTest Set Classification Report:\n", classification_report(y_test, y_test_pred, digits=4))
print("Test Accuracy:", accuracy_score(y_test, y_test_pred))

In [None]:
import os
from sfm_extractor.models.wav2vec2_extractor import Wav2Vec2Extractor

# Set parameters
input_dir_utterance = "aditya/audio_utter"
input_dir_context = "aditya/audio_cont"
output_file_utterance = "aditya/utterance_features.csv"
output_file_context = "aditya/context_features.csv"
device = "cuda"  # Change to "cpu" if no GPU is available

# Initialize extractor
extractor = Wav2Vec2Extractor(device=device)

# Extract features for utterance files
print("Extracting utterance features...")
extractor.extract_folder(input_dir_utterance, output_file_utterance)

# Extract features for context files
print("Extracting context features...")
extractor.extract_folder(input_dir_context, output_file_context)

print("Feature extraction completed!")