In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id, col
import numpy as np
from sklearn.cluster import DBSCAN

In [2]:
spark = SparkSession.builder \
    .appName("DBSCANClustering") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/11/30 19:15:35 WARN Utils: Your hostname, matt resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/30 19:15:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/30 19:15:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.parquet("../data/dblp-v10-processado-vetorizado-word2vec.parquet")

                                                                                

In [4]:
df.columns

['abstract',
 'title',
 'title_word2vec',
 'abstract_word2vec',
 'features',
 'norm_features']

In [5]:
df.printSchema()

root
 |-- abstract: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_word2vec: vector (nullable = true)
 |-- abstract_word2vec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- norm_features: vector (nullable = true)



In [6]:
# Número de partições para processar os dados em partes menores
num_partitions = 10
df_partitioned = df.repartition(num_partitions)

In [7]:
# Parâmetros do DBSCAN
eps = 0.5  # Distância máxima para considerar vizinhos
min_samples = 10  # Número mínimo de pontos para formar um cluster

In [8]:
# Lista para armazenar os resultados de cada partição
results = []

# Iterar sobre cada partição
for i in range(num_partitions):
    # Filtrar apenas os dados da partição atual
    partition_df = df_partitioned.filter(spark_partition_id() == i)
    
    # Extrair os vetores densos da partição como uma lista de arrays numpy
    features = partition_df.select(col("norm_features")).rdd.map(lambda row: row[0].toArray()).collect()
    features_array = np.array(features)
    
    # Informar o tamanho dos dados na partição
    print(f"Partição {i}: {features_array.shape[0]} vetores")
    
    # Aplicar o DBSCAN na partição
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    cluster_labels = dbscan.fit_predict(features_array)
    
    # Informar a quantidade de clusters encontrados
    print(f"Clusters encontrados na partição {i}: {len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)} (inclui ruído)")
    
    # Criar um DataFrame Spark com os rótulos de cluster
    partition_result = spark.createDataFrame(
        [(int(cluster_id),) for cluster_id in cluster_labels],
        ["cluster_label"]
    )
    
    # Adicionar os resultados a uma lista
    results.append(partition_result)

                                                                                

Partição 0: 82754 vetores
Clusters encontrados na partição 0: 60 (inclui ruído)


                                                                                

Partição 1: 82753 vetores
Clusters encontrados na partição 1: 56 (inclui ruído)


                                                                                

Partição 2: 82755 vetores
Clusters encontrados na partição 2: 69 (inclui ruído)


                                                                                

Partição 3: 82755 vetores
Clusters encontrados na partição 3: 59 (inclui ruído)


                                                                                

Partição 4: 82757 vetores
Clusters encontrados na partição 4: 66 (inclui ruído)


                                                                                

Partição 5: 82754 vetores
Clusters encontrados na partição 5: 60 (inclui ruído)


                                                                                

Partição 6: 82754 vetores
Clusters encontrados na partição 6: 62 (inclui ruído)


                                                                                

Partição 7: 82752 vetores
Clusters encontrados na partição 7: 49 (inclui ruído)


                                                                                

Partição 8: 82748 vetores
Clusters encontrados na partição 8: 59 (inclui ruído)


                                                                                

Partição 9: 82751 vetores
Clusters encontrados na partição 9: 60 (inclui ruído)


In [9]:
# Combinar os resultados de todas as partições
result_df = results[0]
for partition_result in results[1:]:
    result_df = result_df.union(partition_result)

In [10]:
# Adicionar os rótulos de cluster ao DataFrame original
df_with_clusters = df_partitioned.withColumn("partition_id", spark_partition_id()) \
    .join(result_df.withColumn("partition_id", spark_partition_id()), on="partition_id", how="inner")

In [11]:
df_with_clusters.columns

['partition_id',
 'abstract',
 'title',
 'title_word2vec',
 'abstract_word2vec',
 'features',
 'norm_features',
 'cluster_label']

In [12]:
# Salvar o DataFrame resultante com os rótulos de cluster
# df_with_clusters.write.parquet("dbscan_clusters.parquet", mode="overwrite")