In [None]:
!pip install numpy 
!pip install pandas
!pip install matplotlib 
!pip install scikit-learn 
!pip install tensorflow 
!pip install seaborn

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras import regularizers, constraints, initializers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from scipy.stats import wasserstein_distance

In [None]:
github_url = 'https://raw.githubusercontent.com/aniruddhachoudhury/Red-Wine-Quality/master/winequality-red.csv'
wine_data = pd.read_csv(github_url)

# Check for missing values in the dataset
missing_values = wine_data.isnull().sum()

# Print missing values if any
print("Missing values in each column:\n", missing_values)

# Initialize the StandardScaler
scaler = StandardScaler()

# Select features for scaling
features = wine_data.columns #[:-1]

# Fit and transform the data
wine_data_scaled = wine_data.copy()
wine_data_scaled[features] = scaler.fit_transform(wine_data[features])

# Display the first few rows of the scaled dataset
print(wine_data_scaled.head())

In [None]:
def build_generator():
    model = models.Sequential()
    model.add(layers.Dense(512, input_dim=len(features), kernel_initializer='he_normal'))  # Increase the number of neurons
    model.add(layers.ReLU()) # Use the ReLU activation function
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.Dense(1024, kernel_initializer='he_normal'))  # Add layers and neurons
    model.add(layers.ReLU())
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.Dense(2048, kernel_initializer='he_normal'))  # Further increase in complexity
    model.add(layers.ReLU())
    model.add(layers.BatchNormalization(momentum=0.8))
    model.add(layers.Dense(len(features), activation='linear'))  # Keep the last layer intact
    return model

def build_discriminator():
    model = models.Sequential()
    model.add(layers.Dense(1024, input_dim=len(features), kernel_initializer='he_normal'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.4))  # Add Dropout
    model.add(layers.Dense(1024, kernel_initializer='he_normal'))  # Increase the number of neurons
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.4))
    model.add(layers.Dense(512, kernel_initializer='he_normal'))   # Add more layers
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.4))
    model.add(layers.Dense(1, activation='sigmoid'))
    return model

In [None]:
features = wine_data.columns 
generator = build_generator()
discriminator = build_discriminator()

# Define file paths where the weights will be saved
generator_checkpoint_path =  'generator_weights.h5'
discriminator_checkpoint_path = 'discriminator_weights.h5'

# Create ModelCheckpoint callbacks
generator_checkpoint = ModelCheckpoint(generator_checkpoint_path, save_weights_only=True, save_best_only=True, monitor='g_loss', mode='min')
discriminator_checkpoint = ModelCheckpoint(discriminator_checkpoint_path, save_weights_only=True, save_best_only=True, monitor='d_loss', mode='min')

discriminator.compile(loss='binary_crossentropy', optimizer='adam',metrics=['accuracy'])
gan = models.Sequential([generator, discriminator])
discriminator.trainable = False
gan.compile(loss='binary_crossentropy', optimizer='adam')

In [None]:
def train_gan(gan, dataset, epochs=1000, batch_size=64, verbose=True ):  # Increase training cycles and batch sizes
    valid = np.ones((batch_size, 1)) * 0.9  # Soft labels for real data
    fake = np.zeros((batch_size, 1)) + 0.1  # Soft labels for fake data

    checkpoint_g = ModelCheckpoint('generator.h5', save_weights_only=True, save_best_only=True, monitor='g_loss', mode='min')
    checkpoint_d = ModelCheckpoint('discriminator.h5', save_weights_only=True, save_best_only=True, monitor='d_loss', mode='min')
    
    for epoch in range(epochs):
        # training discriminator
        real_data = np.reshape(dataset[np.random.randint(0, dataset.shape[0], size=batch_size)], (-1, len(features)))
        fake_data = generator.predict(np.random.normal(0, 1, size=(batch_size, len(features))))
        
        x = np.concatenate([real_data, fake_data])
        y = np.concatenate([np.ones(batch_size) * 0.9, np.zeros(batch_size) * 0.1])  # Use hashtag smoothing
        discriminator.trainable = True
        d_loss = discriminator.train_on_batch(x, y)

        # training generator
        noise = np.random.normal(0, 1, size=(batch_size, len(features)))
        y_gen = np.ones(batch_size)
        discriminator.trainable = False
        g_loss = gan.train_on_batch(noise, y_gen)

        if verbose and epoch % 100 == 0:
            print("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))
        if epoch % 100 == 0:  # You can adjust the frequency of saving as needed
            generator.save_weights(generator_checkpoint_path)
            discriminator.save_weights(discriminator_checkpoint_path)

In [None]:
# Prepare data
dataset = wine_data_scaled[features].values

# Training model
train_gan(gan, dataset, epochs=10000)

In [None]:
def generate_data(generator, n_samples=1000):
    noise = np.random.normal(0, 1, size=(n_samples, len(features)))
    return generator.predict(noise)

def evaluate_model(original_data, generated_data, n_clusters=10):
    scaler = StandardScaler()
    original_data = scaler.fit_transform(original_data)
    generated_data = scaler.transform(generated_data)
    
    # Fit K-Means to the combined dataset
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    kmeans.fit(original_data)

    # Predict clusters for both original and generated data
    original_labels = kmeans.predict(original_data)
    generated_labels = kmeans.predict(generated_data)

    # Calculate the distribution of clusters for both datasets
    original_cluster_distribution = np.bincount(original_labels, minlength=n_clusters) / len(original_labels)
    generated_cluster_distribution = np.bincount(generated_labels, minlength=n_clusters) / len(generated_labels)

    # Calculate the Wasserstein distance between the cluster distributions
    distance = wasserstein_distance(original_cluster_distribution, generated_cluster_distribution)

    # Evaluate cluster centroids for diversity
    centroids = kmeans.cluster_centers_
    centroid_distances = np.linalg.norm(centroids[:, np.newaxis] - centroids, axis=2)
    diversity_score = np.mean(centroid_distances)

    # Print evaluation results
    print(f'Wasserstein distance between cluster distributions: {distance}')
    print(f'Average distance between cluster centroids (diversity score): {diversity_score}')
    
    return distance, diversity_score

new_data = generate_data(generator, n_samples=1000)
print(new_data)

generated_df = pd.DataFrame(new_data, columns=features)
original_df = pd.DataFrame(wine_data, columns=features)

if 'quality' in generated_df.columns:
    generated_df['quality'] = np.round(generated_df['quality']).astype(int)
original_data = original_df[features].values.astype(np.float32)
generated_data = generated_df.values.astype(np.float32)
distance, diversity_score = evaluate_model(original_data, generated_data)
print(f'Wasserstein distance between cluster distributions: {distance}, Average distance between cluster centroids (diversity score): {diversity_score}')

In [None]:
# Function to find outliers with an increased multiplier for less sensitivity
def find_outliers(df, feature, multiplier=5):
    Q1 = df[feature].quantile(0.25)
    Q3 = df[feature].quantile(0.75)
    IQR = Q3 - Q1
    return (df[feature] < (Q1 - multiplier * IQR)) | (df[feature] > (Q3 + multiplier * IQR))

# Customize the figure size to make the plot larger
fig_size = 40

# Define colors for normal points and outliers
normal_color = "#771cd6"
outlier_color = "#e37d3d"

# Plot for original_df
plt.figure(figsize=(fig_size, fig_size))
for i in range(len(features)):
    for j in range(len(features)):
        ax = plt.subplot(len(features), len(features), i * len(features) + j + 1)
        if i == j:
            sns.kdeplot(original_df[features[i]], fill=True, color=normal_color)
        else:
            sns.scatterplot(x=features[j], y=features[i], data=original_df, color=normal_color, alpha=0.6, edgecolor=None, s=30)
        ax.xaxis.set_major_locator(ticker.AutoLocator())
        ax.yaxis.set_major_locator(ticker.AutoLocator())
        ax.tick_params(axis='x', labelrotation=90, labelsize=10)
        ax.tick_params(axis='y', labelsize=10)
        if j > 0:
            ax.set_ylabel('')
        if i < len(features) - 1:
            ax.set_xlabel('')
        if i == 0:
            ax.set_title(features[j], fontsize=16)
        if j == 0:
            ax.set_ylabel(features[i], fontsize=16)
plt.suptitle('Pairplot of the Original Data', y=1.02, size=20)
plt.tight_layout()
plt.show()

# Define two sets to store the indexes of the exception samples
all_outliers_indices = set()
quality_outliers_indices = set()

# Plot for generated_df
plt.figure(figsize=(fig_size, fig_size))
for i in range(len(features)):
    for j in range(len(features)):
        ax = plt.subplot(len(features), len(features), i * len(features) + j + 1)
        if i == j:
            sns.kdeplot(generated_df[features[i]], fill=True, color=normal_color)
        else:
            sns.scatterplot(x=features[j], y=features[i], data=generated_df, color=normal_color, alpha=0.6, edgecolor=None, s=30)
            outliers_x = find_outliers(generated_df, features[j], multiplier=5)
            outliers_y = find_outliers(generated_df, features[i], multiplier=5)
            sns.scatterplot(x=features[j], y=features[i], data=generated_df[outliers_x | outliers_y], color=outlier_color, alpha=0.6, edgecolor=None, s=30)
        
        # Update the set of all outliers indexes
            all_outliers_indices.update(generated_df[outliers_x | outliers_y].index)

            # Update the quality outlier index set only if the outlier has a quality feature >= 6
            if 'quality' in features:
                quality_outliers_indices.update(
                    generated_df[(outliers_x | outliers_y) & (generated_df['quality'] >= 6)].index
                )
        
        ax.xaxis.set_major_locator(ticker.AutoLocator())
        ax.yaxis.set_major_locator(ticker.AutoLocator())
        ax.tick_params(axis='x', labelrotation=90, labelsize=10)
        ax.tick_params(axis='y', labelsize=10)
        if j > 0:
            ax.set_ylabel('')
        if i < len(features) - 1:
            ax.set_xlabel('')
        if i == 0:
            ax.set_title(features[j], fontsize=16)
        if j == 0:
            ax.set_ylabel(features[i], fontsize=16)

In [None]:
# Extract anomaly samples from the generated DataFrame using the collected indexes
all_outliers_df = generated_df.iloc[list(all_outliers_indices)]
quality_outliers_df = generated_df.iloc[list(quality_outliers_indices)]

# Show samples with all outliers and outliers with QUALITY >= 6
print("All Outliers:")
print(all_outliers_df)
print("\nQuality Outliers (Quality >= 6):")
print(quality_outliers_df)

In [None]:
# Heatmap - shows correlation between features
plt.figure(figsize=(10, 8))
sns.heatmap(original_df.corr(), annot=True, fmt=".2f")
plt.title('Heatmap of Correlation - Original Data')
plt.show()

plt.figure(figsize=(10, 8))
sns.heatmap(generated_df.corr(), annot=True, fmt=".2f")
plt.title('Heatmap of Correlation - Generated Data')
plt.show()

In [None]:
if 'Quality_Range' not in generated_df.columns:
    generated_df['Quality_Range'] = pd.cut(generated_df['quality'], bins=[-5, 0, 2, 6], labels=['-2-0', '0-2', '2-4'])
generated_df['Quality_Range'] = pd.cut(generated_df['quality'], bins=[-5, 0, 2, 6], labels=['-2-0', '0-2', '2-4'])

num_cols = generated_df.select_dtypes(include=[np.number]).columns.tolist()
plt.figure(figsize=(14,len(num_cols)*3))
for idx,column in enumerate(num_cols):
    plt.subplot(len(num_cols)//2+1,2,idx+1)
    sns.boxplot(x="Quality_Range", y=column, data=generated_df,palette="twilight_shifted",legend=False)
    plt.title(f"{column} Distribution")
    plt.tight_layout()

In [None]:
color_palette = ['blue', 'orange', 'green']
# Histogram for each numerical column
plt.figure(figsize=(14, len(num_cols) * 3))
for idx, column in enumerate(num_cols):
    plt.subplot(len(num_cols) // 2 + 1, 2, idx + 1)
    # Group data by 'Quality_Range' before plotting
    for range_idx, range_label in enumerate(['-2-0', '0-2', '2-4']):
        subset = generated_df[generated_df['Quality_Range'] == range_label]
        sns.histplot(subset[column], bins=30, kde=True, color=color_palette[range_idx], label=range_label)
    plt.legend(title='Quality Range')
    plt.title(f"{column} Distribution by Quality Range")
    plt.tight_layout()
plt.show()

In [None]:
def plot_count(generated_df: pd.DataFrame, col: str, title_name: str='Target Variable Distribution') -> None:
    # Set background color
    plt.rcParams['axes.facecolor'] = 'white'
    
    f, ax = plt.subplots(1, 2, figsize=(14, 8))
    plt.subplots_adjust(wspace=0.2)

    s1 = generated_df[col].value_counts()
    N = len(s1)

    outer_sizes = s1
    inner_sizes = s1/N

    outer_colors = ['#525250', '#C68C2A', '#4A291E', '#269FAC', '#FF9074'][:N]
    inner_colors = ['#4E8278', '#DFBF87', '#9C1027', '#98BDB1', '#FECA8B'][:N]

    ax[0].pie(
        outer_sizes, colors=outer_colors, 
        labels=s1.index.tolist(), 
        startangle=90, frame=True, radius=1.3, 
        explode=([0.05]*(N-1) + [0.1]),
        wedgeprops={'linewidth' : 1, 'edgecolor' : 'white'}, 
        textprops={'fontsize': 12, 'weight': 'bold'}
    )

    textprops = {
        'size': 17, 
        'weight': 'bold', 
        'color': 'black'
    }

    ax[0].pie(
        inner_sizes, colors=inner_colors,
        radius=1, startangle=90,
        autopct='%1.f%%', explode=([0.1]*(N-1) + [0.2]),
        pctdistance=0.8, textprops=textprops
    )

    center_circle = plt.Circle((0,0), 0.68, color='black', fc='white', linewidth=0)
    ax[0].add_artist(center_circle)

    x = s1
    y = s1.index.tolist()
    sns.barplot(
        x=x, y=y, ax=ax[1],
        palette='pastel', orient='horizontal'
    )

    ax[1].spines['top'].set_visible(False)
    ax[1].spines['right'].set_visible(False)
    ax[1].tick_params(
        axis='x',         
        which='both',      
        bottom=False,      
        labelbottom=False
    )

    for i, v in enumerate(s1):
        ax[1].text(v, i, " " + str(v), color='black', fontweight='bold', fontsize=15)

    plt.setp(ax[1].get_yticklabels(), fontweight="bold")
    plt.setp(ax[1].get_xticklabels(), fontweight="bold")
    ax[1].set_xlabel(col, fontweight="bold", color='black')
    ax[1].set_ylabel('count', fontweight="bold", color='black')

    f.suptitle(title_name, fontsize=18, fontweight='bold')
    plt.tight_layout()
    plt.show()

# Plot the count of 'Quality_Range'
plot_count(generated_df, 'Quality_Range', 'Target Variable Distribution')