In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0da1ffdacae123e3ca83971d065646d234102375c1e0825b98fc1e64f489b105
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import time

spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("k-prototype-global") \
        .getOrCreate()

sc = spark.sparkContext

In [23]:
df = pd.read_csv("Retail_Transaction_Dataset.csv")
df = df.dropna()
len(df)

100000

In [24]:
# df = pd.read_csv("data.csv")


labels = ["Quantity", "Price", "PaymentMethod","ProductID"]

# from which label are the categorical variables
categorical_labels_start_index = 2

labels[categorical_labels_start_index:]

['PaymentMethod', 'ProductID']

In [25]:
df = df[labels]
df.head()

Unnamed: 0,Quantity,Price,PaymentMethod,ProductID
0,7,80.079844,Cash,C
1,4,75.195229,Cash,C
2,8,31.528816,Cash,A
3,5,98.880218,PayPal,D
4,7,93.188512,Cash,A


In [26]:
df = spark.createDataFrame(df)
df.show(5)

+--------+-----------+-------------+---------+
|Quantity|      Price|PaymentMethod|ProductID|
+--------+-----------+-------------+---------+
|       7|80.07984415|         Cash|        C|
|       4|75.19522942|         Cash|        C|
|       8|31.52881648|         Cash|        A|
|       5|98.88021828|       PayPal|        D|
|       7|93.18851246|         Cash|        A|
+--------+-----------+-------------+---------+
only showing top 5 rows



In [27]:
df.printSchema()

root
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- ProductID: string (nullable = true)



## Normalising the data

In [28]:
# min max normalisation for the numerical data
for col in df.columns[:categorical_labels_start_index]:
  minimum = df.agg({col: "min"}).collect()[0][0]
  maximum = df.agg({col: "max"}).collect()[0][0]

  df = df.withColumn(col + '_norm', (df[col] - minimum) / (maximum - minimum))

In [29]:
df.show(10)

+--------+-----------+-------------+---------+-------------+-------------------+
|Quantity|      Price|PaymentMethod|ProductID|Quantity_norm|         Price_norm|
+--------+-----------+-------------+---------+-------------+-------------------+
|       7|80.07984415|         Cash|        C|         0.75| 0.7786700737595417|
|       4|75.19522942|         Cash|        C|        0.375| 0.7243958855114194|
|       8|31.52881648|         Cash|        A|        0.875|0.23920734004228444|
|       5|98.88021828|       PayPal|        D|          0.5| 0.9875657801905688|
|       7|93.18851246|         Cash|        A|         0.75| 0.9243237989488644|
|       3|54.09315249|         Cash|        D|         0.25|0.48992537779848405|
|       7|13.12193739|       PayPal|        D|         0.75| 0.0346838564079694|
|       8|56.02516419|   Debit Card|        A|        0.875| 0.5113924478745256|
|       5|23.85798105|  Credit Card|        B|          0.5|0.15397474972329578|
|       4| 63.3427768|   Deb

In [31]:
df_norm = df.drop("Quantity").drop("Price")
df_norm.show(5)

+-------------+---------+-------------+-------------------+
|PaymentMethod|ProductID|Quantity_norm|         Price_norm|
+-------------+---------+-------------+-------------------+
|         Cash|        C|         0.75| 0.7786700737595417|
|         Cash|        C|        0.375| 0.7243958855114194|
|         Cash|        A|        0.875|0.23920734004228444|
|       PayPal|        D|          0.5| 0.9875657801905688|
|         Cash|        A|         0.75| 0.9243237989488644|
+-------------+---------+-------------+-------------------+
only showing top 5 rows



## K prototype

In [32]:
# make into partitions
rdd = df_norm.rdd.repartition(100)
rdd.cache()

MapPartitionsRDD[76] at coalesce at NativeMethodAccessorImpl.java:0

In [33]:
def to_numpy_numerical(row, index):
  return np.array(list(row.asDict().values())[index:])

In [34]:
def to_numpy_categorical(row, index):
  return np.array(list(row.asDict().values())[:index])

## Initializing centroids

In [35]:
index = len(labels) - categorical_labels_start_index
numerical_rdd = rdd.map(lambda row: to_numpy_numerical(row, index))

# initialize k clusters
k = 3
seed = 42
numerical_centres = numerical_rdd.takeSample(False, k, seed)

In [36]:
numerical_centres

[array([0.125     , 0.81932049]),
 array([0.625     , 0.05905246]),
 array([0.5       , 0.38920014])]

In [37]:
categorical_rdd = rdd.map(lambda row: to_numpy_categorical(row, index))
categorical_centres = categorical_rdd.takeSample(False, k, seed)

In [38]:
categorical_centres

[array(['Cash', 'D'], dtype='<U4'),
 array(['Cash', 'B'], dtype='<U4'),
 array(['PayPal', 'D'], dtype='<U6')]

In [39]:
# broadcast the centre values
numerical_centres_bc = sc.broadcast(numerical_centres)
categorical_centres_bc = sc.broadcast(categorical_centres)

## Distance measures

In [40]:
# find the nearest centroid
def euclidean_distance(vect):
  return np.sqrt(np.sum((numerical_centres_bc.value - vect)**2, axis=1))

def hamming_distance(vect):
  return 0.2 * np.sum(vect != categorical_centres_bc.value, axis=1)


In [41]:
# find the distance to the centroid for numerical data
numerical_rdd.map(lambda vect: euclidean_distance(vect)).collect()[:10]


[array([1.06476842, 0.4052192 , 0.53027045]),
 array([0.170156  , 1.07587277, 0.74002971]),
 array([0.67976008, 0.50857   , 0.29834675]),
 array([0.55788538, 0.51280875, 0.221337  ]),
 array([1.01813982, 0.44506683, 0.50811367]),
 array([0.13868487, 1.03130047, 0.70020517]),
 array([0.2601828 , 0.86907895, 0.5175202 ]),
 array([0.8811789 , 0.75570532, 0.59686186]),
 array([0.7622144 , 0.25321445, 0.31572728]),
 array([0.74549287, 0.37585447, 0.39422597])]

In [42]:
# find the distance to the centroid for categorical data
categorical_rdd.map(lambda vect: hamming_distance(vect)).collect()[:10]

[array([0.4, 0.4, 0.4]),
 array([0.4, 0.2, 0.4]),
 array([0.4, 0.4, 0.2]),
 array([0.4, 0.4, 0.4]),
 array([0.4, 0.4, 0.4]),
 array([0.2, 0.4, 0. ]),
 array([0.2, 0.4, 0.2]),
 array([0. , 0.2, 0.2]),
 array([0. , 0.2, 0.2]),
 array([0. , 0.2, 0.2])]

In [43]:
def get_total_distance(numerical_vect, categorical_vect):
  numerical_distance = euclidean_distance(numerical_vect)
  categorical_distance = hamming_distance(categorical_vect)
  total_distance = numerical_distance + categorical_distance
  return total_distance

In [44]:
total_distance_rdd = numerical_rdd.zip(categorical_rdd).map(lambda x: get_total_distance(x[0], x[1]))

# find the centroid which has the closest distance
closest_centroid_rdd = total_distance_rdd.map(lambda x: np.argmin(x))
closest_centroid_rdd.collect()[:10]

[1, 0, 2, 2, 1, 0, 0, 2, 1, 1]

In [45]:
centroid_categorical_value = closest_centroid_rdd.zip(categorical_rdd).map((lambda x: (x[0],x[1])))


In [46]:
# closest_centroid_rdd.zip(categorical_rdd).map((lambda x: (x[0],x[1]))).collect()[:10]

In [47]:
# closest = numerical_rdd.zip(categorical_rdd).map(lambda x: nearest_centroid(x[0], x[1]))

closest_centroid_rdd.zip(categorical_rdd).map((lambda x: (x[0],x[1]))).collect()[:10]

[(1, array(['Credit Card', 'A'], dtype='<U11')),
 (0, array(['Credit Card', 'B'], dtype='<U11')),
 (2, array(['PayPal', 'A'], dtype='<U6')),
 (2, array(['Debit Card', 'C'], dtype='<U10')),
 (1, array(['Credit Card', 'C'], dtype='<U11')),
 (0, array(['PayPal', 'D'], dtype='<U6')),
 (0, array(['Debit Card', 'D'], dtype='<U10')),
 (2, array(['Cash', 'D'], dtype='<U4')),
 (1, array(['Cash', 'D'], dtype='<U4')),
 (1, array(['Cash', 'D'], dtype='<U4'))]

## find the new centroid for categorical part

In [48]:
centroid_categorical_value = closest_centroid_rdd.zip(categorical_rdd).map((lambda x: (x[0],x[1])))

grouped_rdd = centroid_categorical_value.groupByKey().mapValues(list)

from collections import Counter
import numpy as np

def find_mode(values):
    counts = Counter(values)
    return np.array([max(counts, key=counts.get)])

modes = grouped_rdd.map(lambda x: tuple(find_mode(column) for column in zip(*x[1])))
result = np.array(modes.collect())
numpy_arrays = [np.array(row) for row in result.squeeze()]
numpy_arrays

# categorical_centres_bc = sc.broadcast(numpy_arrays)
# categorical_centres_bc.value

[array(['Cash', 'D'], dtype='<U6'),
 array(['Cash', 'B'], dtype='<U6'),
 array(['PayPal', 'D'], dtype='<U6')]

## find the new centroid for numerical part

In [49]:
centroid_numerical_value = closest_centroid_rdd.zip(numerical_rdd).map((lambda x: (x[0],x[1])))

new_centres = centroid_numerical_value.map(lambda x: ((x[0]),(1,x[1])) ).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
new_c = new_centres.map(lambda x: x[1][1]/x[1][0]).collect()
new_c
# numerical_centres_bc = sc.broadcast(new_c)
# numerical_centres_bc.value

[array([0.24929481, 0.75196773]),
 array([0.65454088, 0.23217833]),
 array([0.56850927, 0.49209791])]

## training

In [50]:
max_iter = 10

start = time.time()
for j in range(max_iter):
  # find the points which are closest
  total_distance_rdd = numerical_rdd.zip(categorical_rdd).map(lambda x: get_total_distance(x[0], x[1]))

  # find the centroid which has the closest distance
  closest_centroid_rdd = total_distance_rdd.map(lambda x: np.argmin(x))


  # find the new centroid for categorical data
  centroid_categorical_value = closest_centroid_rdd.zip(categorical_rdd).map((lambda x: (x[0],x[1])))

  grouped_rdd = centroid_categorical_value.groupByKey().mapValues(list)

  modes = grouped_rdd.map(lambda x: tuple(find_mode(column) for column in zip(*x[1])))
  result = np.array(modes.collect())
  numpy_arrays = [np.array(row) for row in result.squeeze()]

  # find the new centroid for numerical data
  centroid_numerical_value = closest_centroid_rdd.zip(numerical_rdd).map((lambda x: (x[0],x[1])))

  new_centres = centroid_numerical_value.map(lambda x: ((x[0]),(1,x[1])) ).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
  new_c = new_centres.map(lambda x: x[1][1]/x[1][0]).collect()

  # compute the difference between new clusters and previous
  temp_dist = 0
  for i in range(k):
    temp_dist += np.sqrt( np.sum( (new_c[i] - numerical_centres_bc.value[i])**2 ) ) + np.sum( (categorical_centres_bc.value[i] != numpy_arrays[i]) ) * 0.2

  print("iteration", j+1, "centroid difference", temp_dist)

  if temp_dist < 0.001:
    print("centroids remain the same, breaking")
    break
  else:
    categorical_centres_bc = sc.broadcast(numpy_arrays)
    numerical_centres_bc = sc.broadcast(new_c)

end = time.time()

iteration 1 centroid difference 0.4406167635439131
iteration 2 centroid difference 0.14338694856101353
iteration 3 centroid difference 0.15671259782411234
iteration 4 centroid difference 0.17325447619527568
iteration 5 centroid difference 0.07183622452907937
iteration 6 centroid difference 0.020392285628703606
iteration 7 centroid difference 0.015092032344825663
iteration 8 centroid difference 0.012920157133919514
iteration 9 centroid difference 0.010976647578382796
iteration 10 centroid difference 0.01168553657590111


In [51]:
print(end-start)

1895.8336703777313
