In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files

In [None]:
path = "wallets_test"
path = "wallets"
wallets_df = spark.read.json(path)

wallets = wallets_df.rdd.map(lambda w: w.asDict(True)) 

In [None]:
wallet_transactions = wallets.flatMap(lambda w:  [ (w['address'], tx) for tx in w['txs'] or [] ] ) \
                                .map(lambda w:  (w[0], (w[1].get('incoming'), w[1].get('outgoing'))  ))


balance = wallets.map(lambda w: (w['address'], float(w['balance'] or 0)))
received_value = wallets.map(lambda w: (w['address'], float(w['received_value'] or 0 )))
total_txs = wallets.map(lambda w: (w['address'], float(w['total_txs'] or 0)  ))
txs_in  = wallet_transactions.mapValues(lambda w:   w[0] ).filter(lambda w: w[1] is not None)
txs_out = wallet_transactions.mapValues(lambda w:   w[1] ).filter(lambda w: w[1] is not None)

a_in  = txs_in.flatMapValues(lambda tx: tx.get('inputs',[])).mapValues(lambda tx: tx['address'])
a_out = txs_out.flatMapValues(lambda tx: tx.get('outputs',[])).mapValues(lambda tx: tx['address'])

v_in  = txs_in.mapValues(lambda w:    float(w['value'] or 0) )
v_out = txs_out.mapValues(lambda w:   float(w['value'] or 0) )

# features

avg_vin = v_in.mapValues(lambda v: (v,1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1] )).mapValues(lambda x: x[0]/x[1])
avg_vout =  v_out.mapValues(lambda v: (v,1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1] )).mapValues(lambda x: x[0]/x[1])

var_vin = v_in.join(avg_vin).mapValues(lambda v: ((v[0]-v[1])**2,1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1] )).mapValues(lambda x: x[0]/x[1])
var_vout =  v_out.join(avg_vout).mapValues(lambda v: ((v[0]-v[1])**2,1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1] )).mapValues(lambda x: x[0]/x[1])

unique_deg_in = a_in.mapValues( lambda x: {x}).reduceByKey((lambda a, b: a.union(b))).mapValues(len)
unique_deg_out = a_out.mapValues( lambda x: {x}).reduceByKey((lambda a, b: a.union(b))).mapValues(len)

deg_in = a_in.mapValues(lambda x: 1).reduceByKey(lambda a, b: a + b)
deg_out = a_out.mapValues(lambda x: 1).reduceByKey(lambda a, b: a + b)

# unique_deg_in.collect(), deg_in.collect()
v_in.first(), v_out.first()

In [None]:
rdds = [ avg_vin, avg_vout, var_vin, var_vout, unique_deg_in, unique_deg_out, deg_in, deg_out, balance, received_value, total_txs]
names = [ 'avg_vin', 'avg_vout', 'var_vin', 'var_vout', 'unique_deg_in', 'unique_deg_out', 'deg_in', 'deg_out', 'balance', 'received_value', 'total_txs']

final_df = None
for rdd, name in zip(rdds, names):
    df = rdd.toDF(['addr', name ])
    print(df)
    if final_df is None:
        final_df = df
    else:
        final_df = final_df.join(df,'addr', 'outer')

In [None]:
pandas_df = final_df.fillna(0).toPandas()
pandas_df

In [None]:
from database import *

pandas_df.to_sql('wallets_meta', engine,  if_exists='replace', index=False, )

### Machine Learning
Here there is code for PCA + KMeans

In [None]:
import sklearn.preprocessing
import pandas as pd

pandas_df = pd.read_sql_table('wallets_meta', engine)

data = pandas_df.loc[:, pandas_df.columns != 'addr'].to_numpy()
norm_data = sklearn.preprocessing.normalize(data)

In [None]:
from sklearn.decomposition import PCA

pca = PCA(2)
pca = pca.fit(norm_data)
grid = pca.transform(norm_data)

In [None]:
from sklearn.cluster import KMeans

kmeans = KMeans(3)
kmeans.fit(norm_data)
color_number = kmeans.predict(norm_data)

colors = [ ['r','g','b'][i] for i in color_number]
print(colors)

In [None]:
import matplotlib.pyplot as plt

plt.scatter(*grid.T, c=colors)