In [None]:
M = 16
selected_tau = 2.45

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm.autonotebook import tqdm

## Importing Datasets

In [None]:
string_df = pd.read_csv("../../data/interim/string_df.csv")

balanced_pairs_df = pd.read_csv("../../data/train_test/test_pairs.csv", index_col=0)

combinations_df = pd.read_csv("../../data/train_test/10_combinations_df.csv", index_col=0)

balanced_pairs_df.drop_duplicates(inplace=True)
balanced_pairs_df.reset_index(drop=True, inplace=True)

In [None]:
string_df

In [None]:
combinations_df = combinations_df[combinations_df['length'] > 10]

In [None]:
string_df["concatenated"] = string_df["concatenated"].apply(
    lambda x: np.array(list(x)).astype(int)
)

## Importing Best Configurations

In [None]:
import re
import pandas as pd


def parse_log_file(filename):
    data = []

    with open(filename, "r") as file:
        lines = file.readlines()

        current_filter = None
        current_threshold = None
        current_min_error = None
        current_confidence = None

        for line in lines:
            if "Best Filter" in line:
                # Extract Best Filter using regex
                filter_match = re.search(r"Best Filter: (.+)", line)
                if filter_match:
                    current_filter = filter_match.group(1).strip()

            elif "Best Threshold" in line:
                # Extract Best Threshold using regex
                threshold_match = re.search(r"Best Threshold: (.+)", line)
                if threshold_match:
                    current_threshold = int(threshold_match.group(1).strip())

            elif "Min error" in line:
                # Extract Min Error using regex
                min_error_match = re.search(r"Min error: (.+)", line)
                if min_error_match:
                    current_min_error = float(min_error_match.group(1).strip())

            elif "Confidence" in line:
                # Extract Confidence using regex
                confidence_match = re.search(r"Confidence: (.+)", line)
                if confidence_match:
                    current_confidence = float(confidence_match.group(1).strip())

                    # Once we have all values, create a tuple and add it to the data list
                    data.append(
                        (
                            current_filter,
                            current_threshold,
                            current_min_error,
                            current_confidence,
                        )
                    )

                    # Reset current values for the next entry
                    current_filter = None
                    current_threshold = None
                    current_min_error = None
                    current_confidence = None

    # Convert the list of tuples into a DataFrame
    df = pd.DataFrame(
        data, columns=["Best Filter", "Best Threshold", "Min Error", "Confidence"]
    )

    return df

In [None]:
# Usage example:
filename = "../../reports/best_config"
best_configs_df = parse_log_file(filename)

In [None]:
best_configs_df

In [None]:
if M != 0:
    best_configs_df = best_configs_df.head(M)

if M == 0:
    M = len(best_configs_df)

In [None]:
compression_rate = len(string_df["concatenated"].iloc[0]) / best_configs_df.shape[0]

print("Compression Rate:", compression_rate)

## Filters Parser

In [None]:
def filter_parser(input_string: str) -> list:
    # Split the string into its parts
    parts = input_string.split()

    # Initialize the final array
    result = []

    # Process each part
    for part in parts:
        if part.startswith("0["):
            # Extract the number inside the brackets
            count = int(part[2:-1])
            # Append the corresponding number of zeros to the result
            result.extend([0] * count)
        else:
            # Translate the tiles to their respective values
            for char in part:
                if char == "🀆":
                    result.append(-1)
                elif char == "🀫":
                    result.append(1)

    return result

## Apply Filters

In [None]:
def apply_filter(item, filter):
    # item = np.array(list(item)).astype(int)
    item = item.astype(int)
    filter = filter_parser(filter)
    return np.sum(np.multiply(item, filter))

In [None]:
def apply_filter_threshold(item, filter, threshold) -> int:
    if apply_filter(item, filter) > threshold:
        return 1
    else:
        return -1

In [None]:
def apply_filter_threshold_pair(item_1, item_2, filter, threshold) -> int:
    if apply_filter_threshold(item_1, filter, threshold) == apply_filter_threshold(
        item_2, filter, threshold
    ):
        return 1
    else:
        return -1

## Calculate Fingerprint

In [None]:
def hamming_distance(array1, array2, confidence):
    # Check if arrays have the same length
    if len(array1) != len(array2):
        raise ValueError("Arrays must have the same length")

    # Initialize distance counter
    distance = 0

    # Iterate through arrays and count differences
    for i in range(len(array1)):
        if array1[i] != array2[i]:
            distance += confidence[i]

    distance = (distance / sum(confidence)) * len(confidence)

    return distance

In [None]:
def calculate_fingerprint(item, best_filters, best_thresholds, confidence):
    fingerprint = []

    for best_filter, best_threshold in zip(best_filters, best_thresholds):
        filtered = np.sum(np.multiply(item.astype(int), filter_parser(best_filter)))

        if filtered > best_threshold:
            filtered = 1
        else:
            filtered = -1

        fingerprint.append(filtered)

    return fingerprint

In [None]:
fingerprints = []

In [None]:
for i, row in tqdm(string_df.iterrows(), total=string_df.shape[0]):
    # Extracting best filters and thresholds from the main DataFrame
    best_filters = best_configs_df["Best Filter"].tolist()
    best_thresholds = best_configs_df["Best Threshold"].tolist()
    confidence = best_configs_df["Confidence"].tolist()

    # Calculate the fingerprint using the relevant best filters and thresholds
    fingerprint = calculate_fingerprint(
        row["concatenated"], best_filters, best_thresholds, confidence
    )

    # Store the result in the 'fprint' column
    # string_df.at[i, "fprint"] = fingerprint
    fingerprints.append(fingerprint)

In [None]:
string_df["fprint"] = fingerprints

In [None]:
labels_to_remove = ['iPhone11_F', 'iPhone11_M', 'iPhoneXR_A', 'iPhoneXR_L', 'iPhone7_F', 'iPhone12_M', 'iPhone11_B', 'iPhone11_C', 'iPhone12Pro_C', 'S21Ultra_M', 'OppoFindX3Neo_A']
string_df = string_df[~string_df['label'].isin(labels_to_remove)]

In [None]:
string_df = string_df.reset_index(inplace=False, drop=True)

In [None]:
string_df


## Clustering w/ $\tau$

In [None]:
string_df.iloc[0, 2]

In [None]:
hamming_distance(string_df.iloc[0, 2], string_df.iloc[1, 2], confidence) - selected_tau

In [None]:
def predict(string_1, string_2, tau):
    if hamming_distance(string_1, string_2, confidence) < tau:
        return 1
    else:
        return -1

In [None]:
predict(string_df.iloc[0, 2], string_df.iloc[100, 2], selected_tau)

## Clustering

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import homogeneity_score, completeness_score, v_measure_score

In [None]:
def hamming_distance(str1, str2):
    if len(str1) != len(str2):
        raise ValueError("Strings must be of the same length")
    return sum(c1 != c2 for c1, c2 in zip(str1, str2))

def predict(string_1, string_2, tau):
    return 1 if hamming_distance(string_1, string_2) < tau else -1

In [None]:
def cluster_dataset(string_df: pd.DataFrame, tau: float) -> np.ndarray:
    n = len(string_df)
    clusters = np.full(n, -1)  # -1 indicates that the item hasn't been clustered yet
    next_cluster_id = 0
    
    for i in range(n):
        if clusters[i] == -1:  # If not yet clustered
            clusters[i] = next_cluster_id
            for j in range(i + 1, n):
                if predict(string_df['fprint'][i], string_df['fprint'][j], tau) == 1:
                    clusters[j] = next_cluster_id
            next_cluster_id += 1
    
    return clusters


In [None]:
# clusters = cluster_dataset(string_df, selected_tau)

# # Ground truth labels
# true_labels = string_df['label'].to_numpy()

# # Calculate metrics
# homogeneity = homogeneity_score(true_labels, clusters)
# completeness = completeness_score(true_labels, clusters)
# v_measure = v_measure_score(true_labels, clusters)

# print(f"Homogeneity: {homogeneity}")
# print(f"Completeness: {completeness}")
# print(f"V-Measure: {v_measure}")

In [None]:
# string_df['cluster'] = clusters

In [None]:
string_df

In [None]:
import ast

In [None]:
def str_to_list(s):
    try:
        return ast.literal_eval(s)
    except (ValueError, SyntaxError):
        return s 

In [None]:
results = []

In [None]:
for index, row in combinations_df.iterrows():
    combination = index
    length = row['length']

    print('Combination:', combination)
    combination_list = str_to_list(combination)
    
    subset_df = string_df[string_df['label'].isin(combination_list)]

    print(subset_df)

    clusters = cluster_dataset(subset_df, selected_tau)

    