In [1]:
import numpy as np
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
import time
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from tqdm import tqdm

## Initialization and Setup-Steps

In [2]:
# Initialize Spark Session
spark = SparkSession.builder.appName("KMeansSession").getOrCreate()

data_path = "kddcup.data"
raw_data = spark.read.csv(data_path, header=False, inferSchema=True)
raw_data_mini = spark.read.csv(data_path+"_10_percent", header=False, inferSchema=True)

In [3]:
#we assemble a KMeans-capable dataframe from a "finished" dataframe we already assembled
def assemble_vector(dataframe, columns):
    vec_assembler = VectorAssembler(inputCols=columns, outputCol="features")
    return vec_assembler.transform(dataframe)

In [4]:
#for the first two tasks, we need to drop all non-numeric columns, as kMeans cannot deal with them 
def is_numeric_column(column):
    return column[1] != "string"

In [5]:
numeric_columns = []
non_numeric_columns = []

for column in raw_data.dtypes:
    if is_numeric_column(column):
        numeric_columns.append(column[0])
    else:
        non_numeric_columns.append(column[0])

print(numeric_columns)

#dataset we use in Tasks 1 and 2
#numeric_data = raw_data.drop(*non_numeric_columns)

In [7]:
label_cnt_mini = raw_data_mini.groupBy('_c41') \
    .count() \
    .sort('count', ascending=False)

label_cnt_big = raw_data.groupBy('_c41') \
    .count() \
    .sort('count', ascending=False)


In [8]:
label_cnt_mini.show()

In [9]:
label_cnt_big.show()

In [10]:
raw_data = raw_data_mini

## Task 1: Inaccurate Labels

In [12]:
inaccurate_label_data = assemble_vector(raw_data, numeric_columns)
inaccurate_label_data.head(1)

In [13]:
k_from = 2
k_to= 75

In [14]:
squared_score=[]

start_time = time.time()
for i in range(k_from,k_to):
    kmeans=KMeans(k=i, seed=1)
    model=kmeans.fit(inaccurate_label_data)
    score = model.summary.trainingCost
    squared_score.append(score)
    print('Objective Function for k =',i,'is',score)

end_time = time.time()
duration = end_time - start_time
print(f"Execution: {duration} seconds.")

In [41]:
print(squared_score)
plt.plot(range(k_from,k_to),squared_score)
plt.xlabel('k')
plt.ylabel('Objective Function Score')
plt.title('Numerical Columns Score')
plt.show()

## Task 2: Feature Normalization


In [16]:
def scale_dataframe(input_dataframe, start_columns):
    
    assembled_col = [col+"_vec" for col in start_columns]
    scaled_col = [col+"_scaled" for col in assembled_col]
    assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in start_columns]
    scalers = [StandardScaler(inputCol=col, outputCol=col + "_scaled") for col in assembled_col]
    pipeline = Pipeline(stages=assemblers + scalers)
    scalerModel = pipeline.fit(input_dataframe)
    scaledData = scalerModel.transform(input_dataframe)
    
    scaledData = scaledData.drop(*start_columns, *assembled_col)

    return scaledData, scaled_col

In [17]:
#final_data.show()
scaled_data, scaled_col = scale_dataframe(raw_data, numeric_columns)
scaled_data = assemble_vector(scaled_data, scaled_col)
scaled_data.show()

In [18]:
#kfrom2 = 20
#kto2 = 80
k_from_task_2 = 2
k_to_task_2 = 75

In [19]:
squared_score_task_2 = []

start_time = time.time()
for i in range(k_from_task_2, k_to_task_2):
    kmeans = KMeans(k=i, seed=1)
    model = kmeans.fit(scaled_data)
    score = model.summary.trainingCost
    squared_score_task_2.append(score)
    print('Objective Function for k =',i,'is',score)

end_time = time.time()
duration = end_time - start_time
print(f"Execution: {duration} seconds.")

In [42]:
print(squared_score_task_2)
plt.plot(range(k_from_task_2,k_to_task_2),squared_score_task_2)
plt.xlabel('k')
plt.ylabel('Objective Function Score')
plt.title('Normalized Numerical Columns Score')
plt.show()

## Task 3: Categorical Variables

In [21]:
def one_code_encode(dataframe, column):
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_indexed")]
    encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(), outputCol= column+'_encoded') for indexer in indexers]
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders], outputCol=column+"_protocol")
    
    pipeline = Pipeline(stages=indexers + encoders+[assembler])
    model = pipeline.fit(dataframe)
    transformed = model.transform(dataframe)
    return transformed.drop(column+'_indexed', column+'_encoded'), column+"_protocol"

In [22]:
#the protocols are found in column _c1, therefore we one-hot encode this column
transformed, column = one_code_encode(raw_data, column='_c1')


In [23]:
#transformed.show()
#for col in non_numeric_columns:
#    transformed = transformed.drop(col)
    
#transformed.show()
columns = numeric_columns.copy()
columns.append(column)
print(columns)
#raw_data.head()

In [24]:
transformed, columns = scale_dataframe(transformed, columns)
transformed = assemble_vector(transformed, columns)
transformed.show()

In [25]:
k_from_task_3 = 2
k_to_task_3 = 75

In [26]:
squared_score_task_3 = []

start_time = time.time()
for i in range(k_from_task_3, k_to_task_3):
    kmeans = KMeans(k=i, seed=1)
    model = kmeans.fit(transformed)
    score = model.summary.trainingCost
    squared_score_task_3.append(score)
    print('Objective Function for k =',i,'is',score)

end_time = time.time()
duration = end_time - start_time
print(f"Execution: {duration} seconds.")

In [44]:
print(squared_score_task_3)
plt.plot(range(k_from_task_3,k_to_task_3),squared_score_task_3)
plt.xlabel('k')
plt.ylabel('Objective Function Score')
plt.title('Categorical Features Score')
plt.show()

## Entropy-Based quality measure

In [28]:
feature_cols = numeric_columns.copy()
modified_data, col = one_code_encode(raw_data, column='_c1')
feature_cols += [col]
#modified_data, col = one_code_encode(modified_data, column='_c2')
#feature_cols += [col]
#modified_data, col = one_code_encode(modified_data, column='_c3')
#feature_cols += [col]
modified_scaled_data, scaled_col = scale_dataframe(modified_data, feature_cols)
transformed_modified = assemble_vector(modified_scaled_data, scaled_col)
transformed_modified.show()

In [29]:
k_from_task_4 = 45
k_to_task_4 = 70
squared_score_task_4 = []
predictions = []

start_time = time.time()
for i in tqdm(range(k_from_task_4, k_to_task_4)):
    kmeans = KMeans(k=i, seed=1)
    model = kmeans.fit(transformed_modified)
    predictions.append(model.transform(transformed_modified))
    score = model.summary.trainingCost
    squared_score_task_4.append(score)

end_time = time.time()
duration = end_time - start_time
print(f"Execution: {duration} seconds.")

In [30]:
def entropy_score(dataframe):
    
    x = dataframe \
        .groupBy('prediction') \
        .count() \
        .sort('prediction') \
        .toPandas()
    
    gamma = dataframe \
        .groupBy('prediction', '_c41') \
        .count() \
        .sort('prediction') \
        .toDF('prediction', 'label', 'count').toPandas()
    
    total_entropy = 0
    for _, rows in x.iterrows():
        cluster_id = rows['prediction']
        amount_objects = rows['count']
        cluster_label_counts = gamma.loc[gamma['prediction'] == cluster_id].values[:, 2].astype(np.float64)
        a = np.divide(cluster_label_counts, amount_objects)
        cluster_sum = np.sum(np.multiply(a, np.log2(a)))
        total_entropy -= cluster_sum * amount_objects / raw_data.count()
    
    return total_entropy

In [31]:
#total_entropy

In [32]:
entropy_list = [entropy_score(i) for i in tqdm(predictions)]

In [46]:
print(entropy_list)
plt.plot(range(2,70), entropy_list)
plt.xlabel('k')
plt.ylabel('Entropy Score')
plt.title('Categorical and Normalized Entropy Plot')
plt.show()