In [2]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler 
import numpy as np
import pandas as pd
import time
import csv


# Execução para o dataset randômico com 2 dimensões
# Gerando dataset com seed para manter reproducibilidade
np.random.seed(0)
X = np.random.rand(50000, 2)
# Converte o numpy array em pandas DataFrame
df = pd.DataFrame(X)
# Criando um DataFrame Spark à partir do pandas DataFrame
df_spark = spark.createDataFrame(df)
# Transforma as colunas de features para o formato Vector
# Necessário para execução do K-means do Spark
vecAssembler = VectorAssembler(inputCols=df_spark.columns, outputCol='features')
dataset = vecAssembler.transform(df_spark)

# Dicionário para armezenar tempo das execuções
timestamps = {}
# Executando loop para variação dos K clusters
for i in range(2, 101):
    # start_time recebe inicio da execução de cada iteração
    start_time = time.time()
    K = i
    # Define parâmetros para execução do K-means
    kmeans = KMeans(k=K, seed=1, maxIter=5)
    # Roda o treinamento para o conjunto de datos
    model = kmeans.fit(dataset)
    # centers recebe os centroides gerados
    centers = model.clusterCenters()
    # Armazena tempo de execução da iteração
    timestamps[i] = time.time() - start_time
    
# Gravando em um CSV os tempos de execução para cada valor de K
with open('/home/aiquis/timestamps_k100_randomdata_spark.csv', 'w') as csv_file:
    writer = csv.writer(csv_file)
    for key, value in timestamps.items():
        writer.writerow([key, value])

In [None]:
from sklearn.preprocessing import MinMaxScaler

def read_file(file_path, file_name, num_rows):
    dataset = pd.read_csv(file_path+file_name, sep=',', nrows=num_rows)
    return dataset

def preprocess_features(dataset):
    scaler = MinMaxScaler(copy=False)
    dataset = scaler.fit_transform(dataset)
    return dataset


# Execução para o dataset do Censo americano com 68 dimensões
file_path = '/home/aiquis/datasets/census1990-mld/'
file_name = 'USCensus1990.data.txt'
# Alterar essa variável para determinar quantidade de observações
# do dataset a serem carregadas
num_rows = 50000
# Lê o arquivo de dados, dropa coluna com Id e executa
# um pré-processamento simples
X = read_file(file_path, file_name, num_rows)
X.drop('caseid', axis=1, inplace=True)
X = preprocess_features(X)

# Converte o numpy array em pandas DataFrame
df = pd.DataFrame(X)
# Criando um DataFrame Spark à partir do pandas DataFrame
df_spark = spark.createDataFrame(df)
# Transforma as colunas de features para o formato Vector
# Necessário para execução do K-means do Spark
vecAssembler = VectorAssembler(inputCols=df_spark.columns, outputCol='features')
dataset = vecAssembler.transform(df_spark)

# Dicionário para armezenar tempo das execuções
timestamps = {}
# Executando loop para variação dos K clusters
for i in range(2, 101):
    # start_time recebe inicio da execução de cada iteração
    start_time = time.time()
    K = i
    # Define parâmetros para execução do K-means
    kmeans = KMeans(k=K, seed=1, maxIter=5)
    # Roda o treinamento para o conjunto de datos
    model = kmeans.fit(dataset)
    # centers recebe os centroides gerados
    centers = model.clusterCenters()
    # Armazena tempo de execução da iteração
    timestamps[i] = time.time() - start_time
    
# Gravando em um CSV os tempos de execução para cada valor de K
with open('/home/aiquis/timestamps_k100_censusdata_spark.csv', 'w') as csv_file:
    writer = csv.writer(csv_file)
    for key, value in timestamps.items():
        writer.writerow([key, value])