In [7]:
from source import dfs, feature_names

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array

from utils import summary_plots, write_spark_file

In [8]:
dfs.show()
dfs.groupBy('buyer').count().filter("count != 1").show() # all rows are not redundant

+--------------------+-------------------+-----------+--------------------+---------+
|               buyer|          avg_spent|number_txns|        avg_duration|frequency|
+--------------------+-------------------+-----------+--------------------+---------+
|\x000000000002e33...|  621.0099000000009|          1|              2868.0|        1|
|\x00000000009a418...| 62084.025710500006|         14|  106767.85714285714|       11|
|\x0000396ed2931b1...| 212.14824999999996|          1|            103785.0|        1|
|\x0000a574f2030ff...| -7.472300000000001|          1|            262761.0|        1|
|\x000199cb7dfae39...|  513.9166833333334|          3|   361579.3333333333|        3|
|\x00031a9829a11ef...|            288.924|          1|         1.3035459E7|        1|
|\x00063ddb30be7bc...| -835.7995999999999|          1|            353473.0|        1|
|\x0006c3a51d493fd...|0.19244249999999985|          1|               768.0|        1|
|\x0007796d3b5bae6...|-13.598100000000002|          1|

In [9]:
# try type conversion
# dfs = dfs.withColumn("number_txns_double", col("number_txns").cast(DoubleType()))
# dfs = dfs.withColumn("num_nftcontract_double", col("num_nftcontract").cast(DoubleType()))
# dfs = dfs.withColumn("num_currency_double", col("num_currency").cast(DoubleType())) 
# dfs = dfs.withColumn("frequency_double", col("frequency").cast(DoubleType()))

In [10]:
assemble=VectorAssembler(inputCols=feature_names, outputCol='features')
assembled_data=assemble.transform(dfs)

#Standardisation
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(assembled_data)
data = scaler_model.transform(assembled_data)

In [11]:
# vector to array
# data = data.withColumn("sfv", (vector_to_array('scaled_features', "float32"))) #sfv sparse feature vector

In [12]:
# model fit
config = {
    "model": "kmeans",
    "k": 3,
    "feature_names": ['avg_spent','number_txns','avg_duration','frequency'] #  'num_nftcontract', #  'num_currency',
}
# config = {
#     "model": "bisecting_kmeans",
#     "k": 3,
#     "feature_names": ['avg_spent','number_txns','avg_duration','frequency'] #  'num_nftcontract', #  'num_currency',
# }
print(config)
if config['model'] == "kmeans":
    Model = KMeans(featuresCol='scaled_features', k=config["k"],maxIter=20, initMode='k-means||', seed = 1)
elif config['model'] == 'bisecting_kmeans':
    Model = BisectingKMeans(featuresCol='scaled_features', k=config["k"],maxIter=20, seed = 1)
else:
    raise Exception("model not recognised")


model = Model.fit(data)
output = model.transform(data)
output.show(2)

{'model': 'bisecting_kmeans', 'k': 3, 'feature_names': ['avg_spent', 'number_txns', 'avg_duration', 'frequency']}


[Stage 25:>                                                         (0 + 8) / 8]

23/05/02 11:06:17 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


                                                                                

+--------------------+------------------+-----------+------------------+---------+--------------------+--------------------+----------+
|               buyer|         avg_spent|number_txns|      avg_duration|frequency|            features|     scaled_features|prediction|
+--------------------+------------------+-----------+------------------+---------+--------------------+--------------------+----------+
|\x000000000002e33...| 621.0099000000009|          1|            2868.0|        1|[621.009900000000...|[0.04122621968291...|         0|
|\x00000000009a418...|62084.025710500006|         14|106767.85714285714|       11|[62084.0257105000...|[4.12149578088962...|         2|
+--------------------+------------------+-----------+------------------+---------+--------------------+--------------------+----------+
only showing top 2 rows



In [13]:
write_spark_file(dfs,config)

23/05/02 11:06:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [14]:
output.select('prediction').distinct().show()
output.cube('prediction').count().show() # count frequency of clusters. Somehow null shows the total

+----------+
|prediction|
+----------+
|         1|
|         2|
|         0|
+----------+

+----------+------+
|prediction| count|
+----------+------+
|         1|  9797|
|         0|160271|
|      null|186035|
|         2| 15967|
+----------+------+



In [15]:
model.clusterCenters()


[array([0.02274972, 0.27233212, 0.34282263, 0.68825451]),
 array([0.08143914, 0.20243029, 3.85946892, 0.55015124]),
 array([0.0328984 , 2.06304754, 0.32986566, 3.46589938])]

In [16]:
out_dfp = output.toPandas()


                                                                                

In [17]:
g = summary_plots(out_dfp, config)

No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.


<Figure size 1000x1000 with 0 Axes>

In [18]:
output.show()

+--------------------+-------------------+-----------+--------------------+---------+--------------------+--------------------+----------+
|               buyer|          avg_spent|number_txns|        avg_duration|frequency|            features|     scaled_features|prediction|
+--------------------+-------------------+-----------+--------------------+---------+--------------------+--------------------+----------+
|\x000000000002e33...|  621.0099000000009|          1|              2868.0|        1|[621.009900000000...|[0.04122621968291...|         0|
|\x00000000009a418...| 62084.025710500006|         14|  106767.85714285714|       11|[62084.0257105000...|[4.12149578088962...|         2|
|\x0000396ed2931b1...| 212.14824999999996|          1|            103785.0|        1|[212.148249999999...|[0.01408362468914...|         0|
|\x0000a574f2030ff...| -7.472300000000001|          1|            262761.0|        1|[-7.4723000000000...|[-4.9605438067352...|         0|
|\x000199cb7dfae39...|  513