In [1]:
!pip install bertopic
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import MinMaxScaler, RobustScaler, MaxAbsScaler
from bertopic import BERTopic

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)



In [2]:
def encode_images(region_size, overlap, threshold, dataset, filter_common_values=False):

    def pad_image(image, region_size, overlap):
        """ Pads the image to ensure even division into regions """
        height, width = image.shape[1], image.shape[2]
        stride = region_size - overlap
        pad_height = (stride - (height % stride)) % stride
        pad_width = (stride - (width % stride)) % stride

        return F.pad(image, (0, pad_width, 0, pad_height), mode='constant', value=0)


    def regions(image, size, overlap):
        """ Extracts regions from the image """
        padded_image = pad_image(image, size, overlap)
        regions = []
        for i in range(0, padded_image.shape[1], size):
            for j in range(0, padded_image.shape[2], size):
                region = padded_image[0, i:i+size, j:j+size]
                regions.append(region)

        return regions


    def encode(region, threshold):
        """ Converts regions into binary encoded numbers """
        region = torch.where(region < threshold, 0, 1)
        return int(''.join(map(str, region.flatten().int().tolist())), 2)

    encoded_images = [[encode(region, threshold) for region in regions(image, region_size, overlap)]
                      for image, _ in dataset]

    if filter_common_values:
        encoded_images = filter_frequent_values(encoded_images)

    return [[x for x in img if x != 0] for img in encoded_images]  # Remove empty regions


def filter_frequent_values(encoded_images):
    """ Removes common pixel values across images to reduce redundancy """
    all_values = [val for img in encoded_images for val in img]
    value_counts = pd.Series(all_values).value_counts()

    # Define threshold for common values (e.g., if a value appears in more than 80% of images)
    common_threshold = int(len(encoded_images) * 0.8)
    common_values = set(value_counts[value_counts > common_threshold].index)

    # Remove common values
    filtered_images = [[val for val in img if val not in common_values] for img in encoded_images]
    return filtered_images


def create_groupeddf(encoded_images, dataset):
    """ Converts encoded images into a grouped document dataframe """
    text_data = [' '.join(map(str, img)) for img in encoded_images]
    df = pd.DataFrame({'Document': text_data, 'Label': dataset.targets.tolist()})

    return df.groupby('Label', as_index=False).agg({'Document': ' '.join})


def extract_ctfidf_features(groupeddf, score_threshold, scaling_factor, idf_weighting):
    """ Computes cTF-IDF features with adjustable scaling and weighting """
    ctfidf, features = BERTopic()._c_tf_idf(groupeddf, fit=True)
    ctfidf_array = ctfidf.toarray()

    if idf_weighting == "log":
        ctfidf_array = np.log1p(ctfidf_array)

    ctfidf_features = {}
    for idx, topic in enumerate(groupeddf['Label']):
        top_indices = [i for i in range(len(features)) if ctfidf_array[idx][i] >= score_threshold]
        scaled_features = []
        for i in top_indices:
            term = features[i]
            count = max(1, int(ctfidf_array[idx][i] * scaling_factor))
            scaled_features.extend([term] * count)
        ctfidf_features[topic] = scaled_features

    return ctfidf_features


def model_with_params(region_size, overlap, threshold, score_threshold, scaling_factor, remove_common_values,
                      fit_prior, idf_weighting, alpha, train_dataset, test_dataset):
    """ Trains & evaluates Naïve Bayes model with given parameters """

    encoded_train = encode_images(region_size, overlap, threshold, train_dataset, filter_common_values=remove_common_values)
    groupeddf = create_groupeddf(encoded_train, train_dataset)
    ctfidf_features = extract_ctfidf_features(groupeddf, score_threshold, scaling_factor, idf_weighting)

    X_train = [' '.join(words) for words in ctfidf_features.values()]
    y_train = list(ctfidf_features.keys())

    # Encode Testing Data
    X_test = [' '.join(map(str, img)) for img in encode_images(region_size, overlap, threshold, test_dataset, filter_common_values=remove_common_values)]
    y_test = test_dataset.targets.tolist()

    # Vectorization
    vectorizer = CountVectorizer()
    X_train_vectors = vectorizer.fit_transform(X_train)
    X_test_vectors = vectorizer.transform(X_test)

    # Naïve Bayes Training
    model = MultinomialNB(alpha=alpha, fit_prior=fit_prior)
    model.fit(X_train_vectors, y_train)
    y_pred = model.predict(X_test_vectors)

    return accuracy_score(y_test, y_pred)


def measure_data_reduction_per_image(original_dataset, encoded_images, region_size, overlap):
    """ Computes per-image data reduction and reports original vs encoded size """

    original_width, original_height = 28, 28
    original_size = original_width * original_height  # 28 * 28 = 784 pixels

    # Encoded size: derived from the number of non-zero regions per image
    encoded_sizes = [len(img) for img in encoded_images]

    # All images should have the same encoded size per test run
    avg_encoded_size = np.mean(encoded_sizes)  # Get average encoded size (usually same for all)
    encoded_width = encoded_height = int(np.sqrt(avg_encoded_size)) if avg_encoded_size > 0 else 0

    # Calculate percentage reduction
    reduction_percent = 100 * (1 - avg_encoded_size / original_size)

    # Output information
    print(f"Original Image Size: {original_width}x{original_height} ({original_size} pixels)")
    print(f"Encoded Image Size: ~{encoded_width}x{encoded_height} (~{int(avg_encoded_size)} pixels)")
    print(f"Data Reduction Per Image: {reduction_percent:.2f}%\n")

    return avg_encoded_size, reduction_percent

In [None]:
def grid_search(train_dataset, test_dataset):
    """ Runs grid search with specified hyperparameter combinations """

    param_grid = {
        'region_size': [5, 6, 7],
        'overlap': [1, 2, 3],
        'threshold': [-0.95],
        'score_threshold': [0.00015, 0.0002, 0.00025],
        'scaling_factor': [5000, 10000, 15000],
        'remove_common_values': [True, False],
        'fit_prior': [True, False],
        'idf_weighting': ["log", "linear"],
        'alpha': [1.5]
    }

    results = []
    print("Testing Hyperparameter Combinations:\n")

    for region_size in param_grid['region_size']:
        for overlap in param_grid['overlap']:
            for threshold in param_grid['threshold']:
                for score_threshold in param_grid['score_threshold']:
                    for scaling_factor in param_grid['scaling_factor']:
                        for remove_common_values in param_grid['remove_common_values']:
                            for fit_prior in param_grid['fit_prior']:
                                for idf_weighting in param_grid['idf_weighting']:
                                    for alpha in param_grid['alpha']:

                                        # Track start time
                                        start_time = time.time()

                                        print(f"**Testing:** region_size={region_size}, overlap={overlap}, threshold={threshold}, score_threshold={score_threshold:.5f}, scaling_factor={scaling_factor}, remove_common_values={remove_common_values}, fit_prior={fit_prior}, idf_weighting={idf_weighting}, alpha={alpha}...")

                                        encoded_train = encode_images(region_size, overlap, threshold, train_dataset)
                                        encoded_test = encode_images(region_size, overlap, threshold, test_dataset)

                                        # Measure and report data reduction
                                        avg_encoded_size, reduction_percent = measure_data_reduction_per_image(train_dataset, encoded_train, region_size, overlap)

                                        accuracy = model_with_params(
                                            region_size=region_size,
                                            overlap=overlap,
                                            threshold=threshold,
                                            score_threshold=score_threshold,
                                            scaling_factor=scaling_factor,
                                            remove_common_values=remove_common_values,
                                            fit_prior=fit_prior,
                                            idf_weighting=idf_weighting,
                                            alpha=alpha,
                                            train_dataset=train_dataset,
                                            test_dataset=test_dataset
                                        )

                                        # End timer
                                        elapsed_time = time.time() - start_time

                                        results.append((region_size, overlap, threshold, score_threshold, scaling_factor, remove_common_values, fit_prior, idf_weighting, alpha, accuracy, avg_encoded_size, reduction_percent, elapsed_time))
                                        print(f"**Accuracy:** {accuracy:.4f} | **Avg Data Reduction Per Image:** {avg_encoded_size:.2f}% | **Time:** {elapsed_time:.2f} sec\n")

    # Extract best parameters based on accuracy
    best_params = max(results, key=lambda x: x[9])  # x[10] is the accuracy score
    best_score = best_params[9]

    # Print all tested results
    print("\n**Hyperparameter Testing Results:**")
    for res in results:
        print(f"region_size={res[0]}, overlap={res[1]}, threshold={res[2]}, score_threshold={res[3]:.5f}, scaling_factor={res[4]}, remove_common_values={res[5]}, fit_prior={res[6]}, idf_weighting={res[7]}, alpha={res[8]} --> Accuracy: {res[9]:.4f} | Avg Encoded Size: {res[10]:.2f}% | Time: {res[12]:.2f} sec")

    # Print the best parameter combination
    print("\n**Best Parameters Found:**")
    print(f"Region Size: {best_params[0]}, Overlap: {best_params[1]}, Threshold: {best_params[2]}, Score Threshold: {best_params[3]:.5f}, Scaling Factor: {best_params[4]}, Remove Common Values: {best_params[5]}, Fit Prior: {best_params[6]}, IDF Weighting: {best_params[7]}, Alpha: {best_params[8]}")
    print(f"Best Accuracy: {best_score:.4f} | Avg Encoded Size: {best_params[10]:.2f}%")

    return best_params

# Run optimized grid search
best_params = grid_search(train_dataset, test_dataset)

Testing Hyperparameter Combinations:

**Testing:** region_size=6, overlap=1, threshold=-0.95, score_threshold=0.00015, scaling_factor=5000, remove_common_values=True, fit_prior=True, idf_weighting=log, alpha=1.5...
Original Image Size: 28x28 (784 pixels)
Encoded Image Size: ~4x4 (~18 pixels)
Data Reduction Per Image: 97.61%

**Accuracy:** 0.7200 | **Avg Data Reduction Per Image:** 18.71% | **Time:** 222.20 sec

**Testing:** region_size=6, overlap=1, threshold=-0.95, score_threshold=0.00015, scaling_factor=5000, remove_common_values=True, fit_prior=True, idf_weighting=linear, alpha=1.5...
Original Image Size: 28x28 (784 pixels)
Encoded Image Size: ~4x4 (~18 pixels)
Data Reduction Per Image: 97.61%

**Accuracy:** 0.7197 | **Avg Data Reduction Per Image:** 18.71% | **Time:** 222.76 sec

**Testing:** region_size=6, overlap=1, threshold=-0.95, score_threshold=0.00015, scaling_factor=5000, remove_common_values=True, fit_prior=False, idf_weighting=log, alpha=1.5...
Original Image Size: 28x28 