<a href="https://colab.research.google.com/github/Gabriel-R-A/Butina-Algorithm---Molecule-Cluster/blob/main/Butina_numba_%2B_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from numba import jit
from joblib import Parallel, delayed


# Gerar dados aleatórios
table = np.random.randint(2, size=(80000, 166))

# Função otimizada para calcular a similaridade
@jit(nopython=True)
def calc_similarity_numba(data):
    n = data.shape[0]
    similarity = np.zeros((n, n), dtype=np.float32)
    for i in range(n):
        for j in range(i, n):
            overlap_count = np.sum(data[i] * data[j])
            union_count = np.sum(data[i]) + np.sum(data[j]) - overlap_count
            similarity[i, j] = similarity[j, i] = overlap_count / union_count
    return similarity

# Paralelizar o loop externo com joblib
similarity_matrix = np.array(Parallel(n_jobs=-1)(delayed(calc_similarity_numba)(table[i:i+500]) for i in range(0, len(table), 500)))
similarity_matrix = np.vstack(similarity_matrix)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT  # Add VectorUDT import
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql.types import StructType, StructField, LongType, DoubleType

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Similarity Calculation") \
    .getOrCreate()

# Define schema for your data
schema = StructType([
    StructField("features", VectorUDT(), nullable=False),
    StructField("label", DoubleType(), nullable=False)
])

# Converter a matriz numpy em um DataFrame Spark com o schema definido
data = [(Vectors.dense(row), float(row[0])) for row in table]  # Assuming row[0] is your label
df = spark.createDataFrame(data, schema)


# Adicionar uma coluna de identificação única
df = df.withColumn("id", monotonically_increasing_id())

# Criar e ajustar o modelo BucketedRandomProjectionLSH
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=10.0, numHashTables=10)
model = brp.fit(df)

# Função para calcular similaridade entre partições
def calculate_similarity(chunk):
    # Converter o chunk em DataFrame
    df_chunk = spark.createDataFrame(chunk, schema)

    # Calcular a similaridade usando o modelo
    similarities = model.approxSimilarityJoin(df_chunk, df_chunk, 0.5, distCol="JaccardDistance")
    return similarities

# Calcular similaridade em paralelo para cada chunk
similarities = []
for chunk in chunks:
    similarities.append(calculate_similarity(chunk))

# Mostrar os resultados
for result in similarities:
    result.show()

# Fechar a sessão Spark
spark.stop()


+--------------------+--------------------+---------------+
|            datasetA|            datasetB|JaccardDistance|
+--------------------+--------------------+---------------+
|{[1.0,0.0,0.0,0.0...|{1.0, [1.0,0.0,0....|            0.0|
|{[1.0,0.0,0.0,1.0...|{1.0, [1.0,0.0,0....|            0.0|
|{[0.0,0.0,0.0,1.0...|{0.0, [0.0,0.0,0....|            0.0|
|{[0.0,1.0,1.0,1.0...|{0.0, [0.0,1.0,1....|            0.0|
|{[1.0,1.0,1.0,1.0...|{1.0, [1.0,1.0,1....|            0.0|
|{[0.0,1.0,0.0,1.0...|{0.0, [0.0,1.0,0....|            0.0|
|{[0.0,1.0,0.0,0.0...|{0.0, [0.0,1.0,0....|            0.0|
|{[0.0,1.0,1.0,1.0...|{0.0, [0.0,1.0,1....|            0.0|
|{[0.0,0.0,1.0,0.0...|{0.0, [0.0,0.0,1....|            0.0|
|{[1.0,1.0,1.0,0.0...|{1.0, [1.0,1.0,1....|            0.0|
|{[0.0,0.0,0.0,1.0...|{0.0, [0.0,0.0,0....|            0.0|
|{[0.0,1.0,1.0,0.0...|{0.0, [0.0,1.0,1....|            0.0|
|{[0.0,1.0,1.0,0.0...|{0.0, [0.0,1.0,1....|            0.0|
|{[0.0,1.0,1.0,1.0...|{0.0, [0.0,1.0,1..