In [None]:
local = True
if local:
    from pyspark.sql import SparkSession

    # Spark session & context
    builder = SparkSession.builder.master('local[*]')
    builder.config('spark.jars.packages', 'org.apache.spark:spark-avro_2.12:3.1.1')
    spark = builder.getOrCreate()

spark

In [None]:
path_prefix = 'work/data/' if local else 'gs://exported-data-ucu-2/'
# Convert Avro to Parquet
spark.read \
    .format("avro").load(f"{path_prefix}wallets/*") \
    .write.parquet(f"{path_prefix}wallets_p/")

In [None]:
spark.read \
    .format("avro").load(f"{path_prefix}sent_q/*") \
    .write.parquet(f"{path_prefix}sent_q_p/")

In [None]:
spark.read \
    .format("avro").load(f"{path_prefix}received_q/*") \
    .write.parquet(f"{path_prefix}received_q_p/")

In [None]:
# Load wallets data
wallets_raw = spark.read.format("parquet").load("gs://exported-data-ucu-2/wallets-parquet/*")
# wallets_raw.show()

In [None]:
features = [
    'balance', 'sent_trx_number', 'received_trx_number', 'sent_total', 'sent_min', 'sent_avg',
    'sent_max', 'received_total', 'received_min', 'received_avg', 'received_max', 'min_inputs',
    'avg_inputs', 'max_inputs', 'min_outputs', 'avg_outputs', 'max_outputs',
    'age_days', 'sent_range_days', 'received_range_days', 'sent_inactive_days', 'received_inactive_days',
    'send_freq', 'received_freq', 'has_coinbase'
]

In [None]:
#################
## 0) Vectorization
#################

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=features,
    outputCol="features"
)

wallets_vector = assembler.transform(wallets_raw).select('address', 'features')
# wallets_vector.show()

In [None]:
#################
## 1) SCALING / NORMALIZATION
#################

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(wallets_vector)

In [None]:
scalerModel.save(f"{path_prefix}models/scaler")

In [None]:
# Normalize each feature to have unit standard deviation.
wallets_scaled = scalerModel.transform(wallets_vector).select(['address', 'scaledFeatures'])
# wallets_scaled.show(truncate=False)

In [None]:
#################
## 2) RUN PCA ON ALL 24 FEATURES, FIND OUT HOW MANY DIMENSIONS WE NEED
#################
from pyspark.ml.feature import PCA

if local:
    import numpy as np
    pca = PCA(k=len(features), inputCol="scaledFeatures", outputCol="pcaFeatures")
    pca_model = pca.fit(wallets_scaled)
    print(np.cumsum(pca_model.explainedVariance.toArray()))
    i = np.searchsorted(np.cumsum(pca_model.explainedVariance.toArray()), 0.9, side='right')
else:
    i = 8

In [None]:
#################
## 3) REDUCTION OF FEATURES, KEEP AS MANY AS NEEDED
#################

pca = PCA(k=i, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_model = pca.fit(wallets_scaled)
wallets_pcs = pca_model.transform(wallets_scaled).select(['address', "pcaFeatures"])
# wallets_pcs.show(truncate=False)

In [None]:
pca_model.save(f"{path_prefix}models/pca")

In [None]:
#################
## 4) CLUSTERING
#################

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans(featuresCol='pcaFeatures').setK(5).setSeed(42)
clustering_model = kmeans.fit(wallets_pcs)

# Make predictions
wallets_clustered = clustering_model.transform(wallets_pcs).select('address', 'prediction')
# wallets_clustered.show()

In [None]:
clustering_model.save(f"{path_prefix}/models/kmeans")

In [None]:
#################
## 5) QUERY SENT AMOUNTS OF BTC FROM EACH WALLET PER HOUR
#################
sent = spark.read.format("parquet")\
    .load(f"{path_prefix}/sent_q_p/*")\
    .withColumnRenamed("sum", "sum_sent")
# sent.show()

In [None]:
#################
## 6) QUERY RECEIVED AMOUNTS OF BTC TO EACH WALLET PER HOUR
#################
received = spark.read.format("parquet")\
    .load(f"{path_prefix}/received_q_p/*")\
    .withColumnRenamed("sum", "sum_received")
# received.show()


In [None]:
#################
## 7) JOIN AND GROUP ALL DATA
#################

from pyspark.sql.functions import asc

cond = ['date_hour', 'address']
sent_received = sent.join(received, cond, 'outer')
# sent_received.show()

In [None]:
all_data = sent_received.join(wallets_clustered, 'address', 'left')
# all_data.show()

In [None]:
clustered_transactions = all_data.groupby(all_data.date_hour, all_data.prediction)\
    .pivot("prediction")\
    .sum("sum_sent", "sum_received")\
    .sort(asc("date_hour"))
# clustered_transactions.show()

In [None]:
#################
## 8) OUTPUT CALCULATION RESULTS
#################

clustered_transactions \
    .withColumnRenamed("null_sum(sum_sent)","null_sum_sent")\
    .withColumnRenamed("null_sum(sum_received)","null_sum_received")\
    .withColumnRenamed("0_sum(sum_sent)","0_sum_sent")\
    .withColumnRenamed("0_sum(sum_received)","0_sum_received")\
    .withColumnRenamed("1_sum(sum_sent)","1_sum_sent")\
    .withColumnRenamed("1_sum(sum_received)","1_sum_received")\
    .withColumnRenamed("2_sum(sum_sent)","2_sum_sent")\
    .withColumnRenamed("2_sum(sum_received)","2_sum_received")\
    .withColumnRenamed("3_sum(sum_sent)","3_sum_sent")\
    .withColumnRenamed("3_sum(sum_received)","3_sum_received")\
    .withColumnRenamed("4_sum(sum_sent)","4_sum_sent")\
    .withColumnRenamed("4_sum(sum_received)","4_sum_received")\
    .write.parquet(f"{path_prefix}/out/clustered_transactions_p")

In [None]:
clustered_transactions_up = spark.read.format("parquet")\
    .load(f"{path_prefix}/out/clustered_transactions_p/*")

In [None]:
clustered_transactions_up.groupby(clustered_transactions_up.date_hour).sum() \
    .drop('sum(prediction)') \
    .withColumnRenamed("sum(null_sum_sent)", "null_sum_sent") \
    .withColumnRenamed("sum(null_sum_received)", "null_sum_received") \
    .withColumnRenamed("sum(0_sum_sent)", "0_sum_sent") \
    .withColumnRenamed("sum(0_sum_received)", "0_sum_received") \
    .withColumnRenamed("sum(1_sum_sent)", "1_sum_sent") \
    .withColumnRenamed("sum(1_sum_received)", "1_sum_received") \
    .withColumnRenamed("sum(2_sum_sent)", "2_sum_sent") \
    .withColumnRenamed("sum(2_sum_received)", "2_sum_received") \
    .withColumnRenamed("sum(3_sum_sent)", "3_sum_sent") \
    .withColumnRenamed("sum(3_sum_received)", "3_sum_received") \
    .withColumnRenamed("sum(4_sum_sent)", "4_sum_sent") \
    .withColumnRenamed("sum(4_sum_received)", "4_sum_received") \
    .write.parquet(f"{path_prefix}/out_p_g")
