In [None]:
!pip install tensorflow matplotlib numpy scikit-learn seaborn nltk opencv-python

In [None]:
# Use Autoencoder to implement anomaly detection. Build the model by using:
# a. Import required libraries
# b. Upload / access the dataset
# c. Encoder converts it into latent representation
# d. Decoder networks convert it back to the original input
# e. Compile the models with Optimizer, Loss, and Evaluation Metrics

In [None]:
# a. Import required libraries THIS IS FOR CREDITCARD
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# b. Upload / access the dataset
# Load the dataset
df = pd.read_csv('creditcard.csv')

# Explore the dataset
print("Dataset shape:", df.shape)
print("\nDataset info:")
print(df.info())
print("\nClass distribution:")
print(df['Class'].value_counts())
print("\nPercentage of fraud cases: {:.4f}%".format(df['Class'].value_counts()[1] / len(df) * 100))

# Data preprocessing
# Separate features and target
features = df.drop(['Class', 'Time'], axis=1)  # Remove 'Time' as it's not useful for anomaly detection
labels = df['Class']

# Split the data into normal and fraudulent transactions
normal_data = features[labels == 0]
fraud_data = features[labels == 1]

print(f"Normal transactions: {len(normal_data)}")
print(f"Fraudulent transactions: {len(fraud_data)}")

# Scale the features
scaler = StandardScaler()
normal_data_scaled = scaler.fit_transform(normal_data)
fraud_data_scaled = scaler.transform(fraud_data)

# Split normal data into train and test
X_train, X_test_normal = train_test_split(normal_data_scaled, test_size=0.2, random_state=42)

# Use all fraud data for testing
X_test_fraud = fraud_data_scaled

# Combine test data
X_test = np.concatenate([X_test_normal, X_test_fraud])
y_test = np.concatenate([np.zeros(len(X_test_normal)), np.ones(len(X_test_fraud))])

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")

# Autoencoder architecture
input_dim = X_train.shape[1]
encoding_dim = 14  # Size of our encoded representations

# c. Encoder converts it into latent representation
# Input layer
input_layer = Input(shape=(input_dim,))

# Encoder
encoder = Dense(encoding_dim * 2, activation='relu')(input_layer)
encoder = Dropout(0.1)(encoder)
encoder = Dense(encoding_dim, activation='relu')(encoder)

# Latent representation
latent_representation = Dense(encoding_dim // 2, activation='relu', name='bottleneck')(encoder)

# d. Decoder networks convert it back to the original input
# Decoder
decoder = Dense(encoding_dim, activation='relu')(latent_representation)
decoder = Dropout(0.1)(decoder)
decoder = Dense(encoding_dim * 2, activation='relu')(decoder)
output_layer = Dense(input_dim, activation='linear')(decoder)

# Create autoencoder model
autoencoder = Model(inputs=input_layer, outputs=output_layer, name='autoencoder')

# Create encoder model
encoder_model = Model(inputs=input_layer, outputs=latent_representation, name='encoder')

# Compile the models
# e. Compile the models with Optimizer, Loss, and Evaluation Metrics
autoencoder.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='mse',  # Mean Squared Error for reconstruction
    metrics=['mae']  # Mean Absolute Error as additional metric
)

print("Autoencoder architecture:")
autoencoder.summary()

# Train the autoencoder
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=0.0001,
    verbose=1
)

# Train on normal transactions only
history = autoencoder.fit(
    X_train, X_train,
    epochs=100,
    batch_size=256,
    validation_split=0.1,
    callbacks=[early_stopping, reduce_lr],
    verbose=1,
    shuffle=True
)

# Evaluate the model
train_predictions = autoencoder.predict(X_train)
test_predictions = autoencoder.predict(X_test)

# Calculate reconstruction error
train_mse = np.mean(np.power(X_train - train_predictions, 2), axis=1)
test_mse = np.mean(np.power(X_test - test_predictions, 2), axis=1)

# Plot training history
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.show()

# Analyze reconstruction errors
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.hist(train_mse, bins=50, alpha=0.7, label='Normal (Train)', color='blue')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.title('Reconstruction Error - Training Data')
plt.legend()

plt.subplot(1, 2, 2)
normal_test_mse = test_mse[y_test == 0]
fraud_test_mse = test_mse[y_test == 1]

plt.hist(normal_test_mse, bins=50, alpha=0.7, label='Normal (Test)', color='green')
plt.hist(fraud_test_mse, bins=50, alpha=0.7, label='Fraud (Test)', color='red')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.title('Reconstruction Error - Test Data')
plt.legend()

plt.tight_layout()
plt.show()

# Find optimal threshold using training data
threshold = np.percentile(train_mse, 95)  # 95th percentile of training errors
print(f"Optimal threshold (95th percentile): {threshold:.4f}")

# Make predictions based on threshold
y_pred = (test_mse > threshold).astype(int)

# Evaluation metrics
print("\n" + "="*50)
print("ANOMALY DETECTION RESULTS")
print("="*50)

print(f"Threshold: {threshold:.4f}")
print(f"Fraud detection rate: {np.mean(y_pred[y_test == 1]) * 100:.2f}%")
print(f"False positive rate: {np.mean(y_pred[y_test == 0]) * 100:.2f}%")

# Classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Normal', 'Fraud']))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Normal', 'Fraud'], 
            yticklabels=['Normal', 'Fraud'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, test_mse)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

print(f"ROC AUC Score: {roc_auc:.4f}")

# Analyze some examples
print("\n" + "="*50)
print("EXAMPLE ANALYSIS")
print("="*50)

# Get some examples
normal_examples = X_test[y_test == 0][:3]
fraud_examples = X_test[y_test == 1][:3]

normal_reconstructions = autoencoder.predict(normal_examples)
fraud_reconstructions = autoencoder.predict(fraud_examples)

normal_errors = np.mean(np.power(normal_examples - normal_reconstructions, 2), axis=1)
fraud_errors = np.mean(np.power(fraud_examples - fraud_reconstructions, 2), axis=1)

print("Normal transactions reconstruction errors:", normal_errors)
print("Fraud transactions reconstruction errors:", fraud_errors)
print("Threshold:", threshold)

# Feature importance analysis (which features contribute most to reconstruction error)
feature_errors = np.mean(np.abs(X_test - test_predictions), axis=0)
feature_importance = pd.DataFrame({
    'Feature': features.columns,
    'Reconstruction_Error': feature_errors
}).sort_values('Reconstruction_Error', ascending=False)

print("\nTop 10 features contributing to reconstruction error:")
print(feature_importance.head(10))

In [None]:
# a. Import required libraries THIS IS CODE ECG
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# b. Upload / access the dataset
# Load the ECG dataset
df = pd.read_csv('ecg_autoencoder_dataset.csv', header=None)

# Explore the dataset
print("Dataset shape:", df.shape)
print("\nDataset info:")
print(df.info())
print("\nFirst few rows of data:")
print(df.head())

# The last column is the target (0: normal, 1: anomaly)
# First 140 columns are ECG signal features
features = df.iloc[:, :-1]  # All columns except last
labels = df.iloc[:, -1]     # Last column is the target

print(f"\nFeatures shape: {features.shape}")
print(f"Labels shape: {labels.shape}")

# Check class distribution
print("\nClass distribution:")
print(labels.value_counts())
print("\nPercentage of anomaly cases: {:.4f}%".format(labels.value_counts()[1] / len(labels) * 100))

# Split the data into normal and anomalous ECG signals
normal_data = features[labels == 0]
anomaly_data = features[labels == 1]

print(f"Normal ECG signals: {len(normal_data)}")
print(f"Anomalous ECG signals: {len(anomaly_data)}")

# Scale the features
scaler = StandardScaler()
normal_data_scaled = scaler.fit_transform(normal_data)
anomaly_data_scaled = scaler.transform(anomaly_data)

# Split normal data into train and test
X_train, X_test_normal = train_test_split(normal_data_scaled, test_size=0.2, random_state=42)

# Use all anomaly data for testing
X_test_anomaly = anomaly_data_scaled

# Combine test data
X_test = np.concatenate([X_test_normal, X_test_anomaly])
y_test = np.concatenate([np.zeros(len(X_test_normal)), np.ones(len(X_test_anomaly))])

print(f"\nTraining data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")

# Autoencoder architecture
input_dim = X_train.shape[1]  # Number of ECG features (140)
encoding_dim = 32  # Size of our encoded representations

print(f"\nInput dimension: {input_dim}")
print(f"Encoding dimension: {encoding_dim}")

# c. Encoder converts it into latent representation
# Input layer
input_layer = Input(shape=(input_dim,))

# Encoder
encoder = Dense(128, activation='relu')(input_layer)
encoder = Dropout(0.1)(encoder)
encoder = Dense(64, activation='relu')(encoder)
encoder = Dropout(0.1)(encoder)
encoder = Dense(encoding_dim, activation='relu')(encoder)

# Latent representation
latent_representation = Dense(encoding_dim // 2, activation='relu', name='bottleneck')(encoder)

# d. Decoder networks convert it back to the original input
# Decoder
decoder = Dense(encoding_dim, activation='relu')(latent_representation)
decoder = Dropout(0.1)(decoder)
decoder = Dense(64, activation='relu')(decoder)
decoder = Dropout(0.1)(decoder)
decoder = Dense(128, activation='relu')(decoder)
output_layer = Dense(input_dim, activation='linear')(decoder)

# Create autoencoder model
autoencoder = Model(inputs=input_layer, outputs=output_layer, name='autoencoder')

# Create encoder model
encoder_model = Model(inputs=input_layer, outputs=latent_representation, name='encoder')

# Compile the models
# e. Compile the models with Optimizer, Loss, and Evaluation Metrics
autoencoder.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='mse',  # Mean Squared Error for reconstruction
    metrics=['mae']  # Mean Absolute Error as additional metric
)

print("\nAutoencoder architecture:")
autoencoder.summary()

# Train the autoencoder
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=8,
    min_lr=0.0001,
    verbose=1
)

print("\nTraining autoencoder on normal ECG signals only...")

# Train on normal ECG signals only
history = autoencoder.fit(
    X_train, X_train,
    epochs=100,
    batch_size=128,
    validation_split=0.1,
    callbacks=[early_stopping, reduce_lr],
    verbose=1,
    shuffle=True
)

# Evaluate the model
print("\nMaking predictions...")
train_predictions = autoencoder.predict(X_train, verbose=0)
test_predictions = autoencoder.predict(X_test, verbose=0)

# Calculate reconstruction error
train_mse = np.mean(np.power(X_train - train_predictions, 2), axis=1)
test_mse = np.mean(np.power(X_test - test_predictions, 2), axis=1)

# Plot training history
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss', linewidth=2)
plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
plt.title('Autoencoder Training Loss', fontweight='bold')
plt.ylabel('Loss (MSE)')
plt.xlabel('Epoch')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE', linewidth=2)
plt.plot(history.history['val_mae'], label='Validation MAE', linewidth=2)
plt.title('Autoencoder Training MAE', fontweight='bold')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Analyze reconstruction errors
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.hist(train_mse, bins=50, alpha=0.7, label='Normal (Train)', color='blue', density=True)
plt.xlabel('Reconstruction Error (MSE)')
plt.ylabel('Density')
plt.title('Reconstruction Error - Training Data (Normal ECG)', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
normal_test_mse = test_mse[y_test == 0]
anomaly_test_mse = test_mse[y_test == 1]

plt.hist(normal_test_mse, bins=50, alpha=0.7, label='Normal (Test)', color='green', density=True)
plt.hist(anomaly_test_mse, bins=50, alpha=0.7, label='Anomaly (Test)', color='red', density=True)
plt.xlabel('Reconstruction Error (MSE)')
plt.ylabel('Density')
plt.title('Reconstruction Error - Test Data', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Find optimal threshold using training data
threshold = np.percentile(train_mse, 95)  # 95th percentile of training errors
print(f"\nOptimal threshold (95th percentile): {threshold:.4f}")

# Make predictions based on threshold
y_pred = (test_mse > threshold).astype(int)

# Evaluation metrics
print("\n" + "="*60)
print("ECG ANOMALY DETECTION RESULTS")
print("="*60)

print(f"Threshold: {threshold:.4f}")
print(f"Anomaly detection rate (Recall): {np.mean(y_pred[y_test == 1]) * 100:.2f}%")
print(f"False positive rate: {np.mean(y_pred[y_test == 0]) * 100:.2f}%")

# Classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Normal ECG', 'Anomalous ECG']))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Normal', 'Anomaly'], 
            yticklabels=['Normal', 'Anomaly'])
plt.title('ECG Anomaly Detection - Confusion Matrix', fontweight='bold')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, test_mse)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random Classifier')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ECG Anomaly Detection - ROC Curve', fontweight='bold')
plt.legend(loc="lower right")
plt.grid(True, alpha=0.3)
plt.show()

print(f"ROC AUC Score: {roc_auc:.4f}")

# Analyze some examples
print("\n" + "="*60)
print("EXAMPLE ANALYSIS")
print("="*60)

# Get some examples
normal_examples = X_test[y_test == 0][:3]
anomaly_examples = X_test[y_test == 1][:3]

normal_reconstructions = autoencoder.predict(normal_examples, verbose=0)
anomaly_reconstructions = autoencoder.predict(anomaly_examples, verbose=0)

normal_errors = np.mean(np.power(normal_examples - normal_reconstructions, 2), axis=1)
anomaly_errors = np.mean(np.power(anomaly_examples - anomaly_reconstructions, 2), axis=1)

print("Normal ECG reconstruction errors:", normal_errors)
print("Anomalous ECG reconstruction errors:", anomaly_errors)
print("Threshold:", threshold)

# Visualize some ECG signals and their reconstructions
plt.figure(figsize=(15, 10))

# Plot normal ECG reconstruction
plt.subplot(2, 3, 1)
plt.plot(normal_examples[0], 'b-', alpha=0.7, label='Original')
plt.plot(normal_reconstructions[0], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Normal ECG\nReconstruction Error: {normal_errors[0]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 2)
plt.plot(normal_examples[1], 'b-', alpha=0.7, label='Original')
plt.plot(normal_reconstructions[1], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Normal ECG\nReconstruction Error: {normal_errors[1]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 3)
plt.plot(normal_examples[2], 'b-', alpha=0.7, label='Original')
plt.plot(normal_reconstructions[2], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Normal ECG\nReconstruction Error: {normal_errors[2]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

# Plot anomalous ECG reconstruction
plt.subplot(2, 3, 4)
plt.plot(anomaly_examples[0], 'g-', alpha=0.7, label='Original')
plt.plot(anomaly_reconstructions[0], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Anomalous ECG\nReconstruction Error: {anomaly_errors[0]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 5)
plt.plot(anomaly_examples[1], 'g-', alpha=0.7, label='Original')
plt.plot(anomaly_reconstructions[1], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Anomalous ECG\nReconstruction Error: {anomaly_errors[1]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 6)
plt.plot(anomaly_examples[2], 'g-', alpha=0.7, label='Original')
plt.plot(anomaly_reconstructions[2], 'r--', alpha=0.7, label='Reconstructed')
plt.title(f'Anomalous ECG\nReconstruction Error: {anomaly_errors[2]:.4f}', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)

plt.suptitle('ECG Signal Reconstruction - Original vs Autoencoder Output', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

# Feature importance analysis (which time points contribute most to reconstruction error)
time_point_errors = np.mean(np.abs(X_test - test_predictions), axis=0)
time_points = range(1, len(time_point_errors) + 1)

plt.figure(figsize=(12, 6))
plt.plot(time_points, time_point_errors, 'b-', alpha=0.7)
plt.xlabel('Time Point')
plt.ylabel('Average Reconstruction Error')
plt.title('Reconstruction Error Across ECG Time Points', fontweight='bold')
plt.grid(True, alpha=0.3)

# Highlight top 10 time points with highest errors
top_10_indices = np.argsort(time_point_errors)[-10:]
for idx in top_10_indices:
    plt.axvline(x=idx+1, color='red', alpha=0.3, linestyle='--')

plt.tight_layout()
plt.show()

print("\nTop 10 time points with highest reconstruction errors:")
for i, idx in enumerate(top_10_indices[::-1]):
    print(f"{i+1}. Time Point {idx+1}: Error = {time_point_errors[idx]:.4f}")

# Final summary
print("\n" + "="*70)
print("ECG ANOMALY DETECTION SUMMARY")
print("="*70)
print(f"üìä Dataset: ECG signals with {input_dim} time points")
print(f"üéØ Anomaly Detection Rate: {np.mean(y_pred[y_test == 1]) * 100:.2f}%")
print(f"üö´ False Positive Rate: {np.mean(y_pred[y_test == 0]) * 100:.2f}%")
print(f"üìà ROC AUC Score: {roc_auc:.4f}")
print(f"üîß Autoencoder Architecture: {input_dim} ‚Üí 128 ‚Üí 64 ‚Üí {encoding_dim} ‚Üí {encoding_dim//2} ‚Üí ...")
print(f"‚öôÔ∏è Optimizer: Adam (lr=0.001)")
print(f"üìè Threshold: {threshold:.4f} (95th percentile of normal data)")
print(f"üíæ Training samples: {X_train.shape[0]:,}")
print(f"üß™ Test samples: {X_test.shape[0]:,}")
print("="*70)

# Save the model
autoencoder.save('ecg_anomaly_detector.h5')
print(f"\n‚úì Model saved as 'ecg_anomaly_detector.h5'")