# EDA ADDRESS NODES

## IMPORTS

In [1]:
import findspark
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.offline import init_notebook_mode, plot
import seaborn as sns
from ks_crypto.lib.spark_conn import create_yarn_connection
from ks_crypto.lib import constants as C
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel

pd.set_option('display.max_rows', 500)

init_notebook_mode(connected=True)
findspark.init()
spark = create_yarn_connection()

import graphframes as gf


pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.



In [2]:
spark.conf.set("spark.sql.broadcastTimeout",  "1000000")
spark.sparkContext.setCheckpointDir('/temp/')

In [3]:
DANON_FULL_TABLENAME = "kschool-crypto:ks_crypto_dataset.danon_transactions"
TRANSACTIONS_FLATTEN_FULL_TABLENAME = "kschool-crypto.ks_crypto_dataset.transactions_flatten"
F_MIN = '2016-01-01'
F_MAX = '2017-10-01'

PERIOD_DIC = {
    1: ['2016-01-01 02:50:56', '2016-01-01 08:50:56'],
    2: ['2016-01-13 23:40:57', '2016-01-14 05:40:57'],
    3: ['2016-01-27 04:01:32', '2016-01-27 10:01:32'],
    4: ['2016-02-07 17:04:34', '2016-02-07 23:04:34'],
    5: ['2016-02-20 05:43:02', '2016-02-20 11:43:02'],
    6: ['2016-03-05 14:13:59', '2016-03-05 20:13:59'],
    7: ['2016-03-19 03:05:12', '2016-03-19 09:05:12'],
    8: ['2016-04-01 21:14:12', '2016-04-02 03:14:12'],
    9: ['2016-04-14 22:53:05', '2016-04-15 04:53:05'],
    10: ['2016-04-29 01:55:24', '2016-04-29 07:55:24'],
    11: ['2016-05-11 23:10:38', '2016-05-12 05:10:38'],
    12: ['2016-05-25 15:34:08', '2016-05-25 21:34:08'],
    13: ['2016-06-08 18:08:38', '2016-06-09 00:08:38'],
    14: ['2016-06-21 22:20:59', '2016-06-22 04:20:59'],
    15: ['2016-07-05 17:19:48', '2016-07-05 23:19:48'],
    16: ['2016-07-19 17:27:38', '2016-07-19 23:27:38'],
    17: ['2016-08-03 10:01:52', '2016-08-03 16:01:52'],
    18: ['2016-08-16 13:52:56', '2016-08-16 19:52:56'],
    19: ['2016-08-30 06:26:15', '2016-08-30 12:26:15'],
    20: ['2016-09-12 19:15:16', '2016-09-13 01:15:16'],
    21: ['2016-09-25 21:50:08', '2016-09-26 03:50:08'],
    22: ['2016-10-09 00:36:09', '2016-10-09 06:36:09'],
    23: ['2016-10-23 10:58:25', '2016-10-23 16:58:25'],
    24: ['2016-11-06 07:34:13', '2016-11-06 13:34:13'],
    25: ['2016-11-18 23:59:04', '2016-11-19 05:59:04'],
    26: ['2016-12-02 19:42:18', '2016-12-03 01:42:18'],
    27: ['2016-12-15 20:26:20', '2016-12-16 02:26:20'],
    28: ['2016-12-29 09:37:43', '2016-12-29 15:37:43'],
    29: ['2017-01-11 12:49:44', '2017-01-11 18:49:44'],
    30: ['2017-01-23 15:14:17', '2017-01-23 21:14:17'],
    31: ['2017-02-05 13:26:08', '2017-02-05 19:26:08'],
    32: ['2017-02-19 01:06:55', '2017-02-19 07:06:55'],
    33: ['2017-03-04 11:05:28', '2017-03-04 17:05:28'],
    34: ['2017-03-18 00:21:41', '2017-03-18 06:21:41'],
    35: ['2017-03-31 07:38:56', '2017-03-31 13:38:56'],
    36: ['2017-04-13 20:45:41', '2017-04-14 02:45:41'],
    37: ['2017-04-27 20:05:46', '2017-04-28 02:05:46'],
    38: ['2017-05-10 20:00:50', '2017-05-11 02:00:50'],
    39: ['2017-05-23 22:00:57', '2017-05-24 04:00:57'],
    40: ['2017-06-05 05:37:21', '2017-06-05 11:37:21'],
    41: ['2017-06-18 16:13:50', '2017-06-18 22:13:50'],
    42: ['2017-07-02 13:09:19', '2017-07-02 19:09:19'],
    43: ['2017-07-15 03:48:27', '2017-07-15 09:48:27'],
    44: ['2017-07-28 01:01:49', '2017-07-28 07:01:49'],
    45: ['2017-08-10 05:38:54', '2017-08-10 11:38:54'],
    46: ['2017-08-25 00:54:59', '2017-08-25 06:54:59'],
    47: ['2017-09-07 03:20:23', '2017-09-07 09:20:23'],
    48: ['2017-09-19 00:54:47', '2017-09-19 06:54:47']}

In [4]:
def build_period_block_timestamp_dic(input_df, old_num_hours, new_num_hours):

    w_ord = Window.orderBy(C.BLOCK_TIMESTAMP)
    w_id = Window.partitionBy(C.BLOCK_TIMESTAMP)

    diff_hours_lag_fun = (
        (F.col(C.BLOCK_TIMESTAMP).cast('long') -
         F.col(f'lag_{C.BLOCK_TIMESTAMP}').cast('long')) / 3600).cast('int')
    is_more_hours_lag_cond = \
        (diff_hours_lag_fun > old_num_hours) | (F.col(f'lag_{C.BLOCK_TIMESTAMP}').isNull())

    final_cols_dic = {
        'period': F.row_number().over(w_ord),
        'f_min_period': F.col(C.BLOCK_TIMESTAMP).cast('string'),
        'f_max_period': (F.col(C.BLOCK_TIMESTAMP) + F.expr(f'INTERVAL {new_num_hours} HOURS')).cast('string')
    }

    final_cols_list = [v.alias(k) for k, v in final_cols_dic.items()]

    period_df = \
        input_df \
        .filter(F.col('is_deanonymized') == 1)\
        .withColumn(f'lag_{C.BLOCK_TIMESTAMP}', F.lag(C.BLOCK_TIMESTAMP).over(w_ord)) \
        .withColumn('is_first_in_period', F.when(is_more_hours_lag_cond, 1).otherwise(0))\
        .withColumn('is_first_in_period', F.max('is_first_in_period').over(w_id)) \
        .dropDuplicates([C.BLOCK_TIMESTAMP, 'is_first_in_period'])\
        .filter(F.col('is_first_in_period') == 1)\
        .select(*final_cols_list)\
        .toPandas()

    dic = {int(period_df['period'][i]): [period_df['f_min_period'][i], period_df['f_max_period'][i]]
           for i in period_df.index.values.tolist()}

    return dic

def build_period_fun_from_dic(dic, colname):
    stacked_cond = None
    for k, v in dic.items():
        cond = (F.col(colname) >= v[0]) & (F.col(colname) <= v[1])
        stacked_cond = stacked_cond.when(cond, k) if stacked_cond is not None else F.when(cond, k)
        
    return stacked_cond

## 1. READ DATA

In [5]:
transactions_df = \
    spark.read.format('bigquery') \
    .option('table', TRANSACTIONS_FLATTEN_FULL_TABLENAME) \
    .option("filter", f"block_timestamp_month >= '{F_MIN}' AND block_timestamp_month < '{F_MAX}'")\
    .load()\
    .select(C.INPUT_ADDRESS_ID,
            C.OUTPUT_ADDRESS_ID,
            C.BLOCK_TIMESTAMP,
            'is_deanonymized',
            'class')

# transactions_df.count() -> 85738587
# transactions_df.rdd.getNumPartitions() -> 843

In [6]:
transactions_df.count()

1217273420

In [7]:
transactions_df.rdd.getNumPartitions()

843

In [18]:
transactions_df.select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|             3287946|
+--------------------+



## 2. FILTER PERIODS

#### Check periods

In [None]:
ts = \
    transactions_df\
    .filter(F.col('is_deanonymized') == 1)\
    .groupBy('block_timestamp')\
    .count()\
    .orderBy('block_timestamp')\
    .toPandas()

px.line(ts, x='block_timestamp', y='count')

In [None]:
w_ord = Window.orderBy('block_timestamp')
w_id = Window.partitionBy('block_timestamp')

diff_hours_lag_fun = (F.col('block_timestamp').cast('long') -  F.col('lag_block_timestamp').cast('long')) / 3600
is_more_hours_lag_cond = (diff_hours_lag_fun > 3) | (F.col('lag_block_timestamp').isNull())

first_transactions_df = \
    transactions_df\
    .filter(F.col('is_deanonymized') == 1)\
    .select('block_timestamp')\
    .withColumn('lag_block_timestamp', F.lag('block_timestamp').over(w_ord))\
    .withColumn('is_first_in_period',  F.when(is_more_hours_lag_cond, 1).otherwise(0))\
    .withColumn('is_first_in_period',  F.max('is_first_in_period').over(w_id))

In [None]:
first_transactions_df\
    .filter(F.col('is_first_in_period') == 1)\
    .drop_duplicates(['block_timestamp', 'is_first_in_period'])\
    .count()

In [None]:
first_transactions_df \
    .filter(F.col('is_first_in_period') == 1)\
    .drop_duplicates(['block_timestamp', 'is_first_in_period'])\
    .groupBy(F.to_date('block_timestamp'))\
    .count()\
    .orderBy(F.desc('count'))\
    .show(2)

In [None]:
first_transactions_df\
    .filter(F.col('is_first_in_period') == 1)\
    .drop_duplicates(['block_timestamp', 'is_first_in_period'])\
    .withColumn('lag_block_timestamp', F.lag('block_timestamp').over(w_ord))\
    .withColumn('diff_lag', diff_hours_lag_fun)\
    .orderBy('block_timestamp')\
    .show(50)

#### Create period var

In [6]:
period_dic = build_period_block_timestamp_dic(transactions_df, 3, 6)
period_fun = build_period_fun_from_dic(period_dic, C.BLOCK_TIMESTAMP)


In [None]:
period_transactions_df = \
    transactions_df\
    .withColumn('period', period_fun)\
    .persist()

period_transactions_df.count()


1217273420

In [None]:
# period_transactions_df.groupBy('period').count().show(50)

In [None]:
filtered_df = \
    period_transactions_df.filter(F.col('period').isNotNull())

In [None]:
filtered_df.count()

In [None]:
filtered_df.select(F.sum('is_deanonymized')).show()

In [None]:
transactions_df.select(F.sum('is_deanonymized')).show()

In [10]:
period_dic

{1: ['2016-01-01 02:50:56', '2016-01-01 08:50:56'],
 2: ['2016-01-13 23:40:57', '2016-01-14 05:40:57'],
 3: ['2016-01-27 04:01:32', '2016-01-27 10:01:32'],
 4: ['2016-02-07 17:04:34', '2016-02-07 23:04:34'],
 5: ['2016-02-20 05:43:02', '2016-02-20 11:43:02'],
 6: ['2016-03-05 14:13:59', '2016-03-05 20:13:59'],
 7: ['2016-03-19 03:05:12', '2016-03-19 09:05:12'],
 8: ['2016-04-01 21:14:12', '2016-04-02 03:14:12'],
 9: ['2016-04-14 22:53:05', '2016-04-15 04:53:05'],
 10: ['2016-04-29 01:55:24', '2016-04-29 07:55:24'],
 11: ['2016-05-11 23:10:38', '2016-05-12 05:10:38'],
 12: ['2016-05-25 15:34:08', '2016-05-25 21:34:08'],
 13: ['2016-06-08 18:08:38', '2016-06-09 00:08:38'],
 14: ['2016-06-21 22:20:59', '2016-06-22 04:20:59'],
 15: ['2016-07-05 17:19:48', '2016-07-05 23:19:48'],
 16: ['2016-07-19 17:27:38', '2016-07-19 23:27:38'],
 17: ['2016-08-03 10:01:52', '2016-08-03 16:01:52'],
 18: ['2016-08-16 13:52:56', '2016-08-16 19:52:56'],
 19: ['2016-08-30 06:26:15', '2016-08-30 12:26:15'],
 2

## 3. GET CONNECTED NODES

In [8]:
period_fun = build_period_fun_from_dic(PERIOD_DIC, C.BLOCK_TIMESTAMP)

filtered_df = \
    transactions_df\
    .withColumn('period', period_fun)\
    .filter(F.col('period').isNotNull())

In [9]:
vertex_df = \
    filtered_df\
    .select(F.col(C.INPUT_ADDRESS_ID).alias('id'), 'is_deanonymized')\
    .unionByName(transactions_df\
                 .select(F.col(C.OUTPUT_ADDRESS_ID).alias('id'), 'is_deanonymized'))\
    .dropDuplicates(['id'])\
    .repartition(256)\
    .persist(StorageLevel.MEMORY_ONLY)

print(f'vertex_count: {vertex_df.count()}')

vertex_count: 196913505


In [23]:
edges_df = \
    filtered_df\
    .select(F.col(C.INPUT_ADDRESS_ID).alias(C.SRC), F.col(C.OUTPUT_ADDRESS_ID).alias(C.DST), 'is_deanonymized')\
    .repartition(256)\
    .persist(StorageLevel.MEMORY_ONLY)

print(f'edges_count: {edges_df.count()}')

edges_count: 23427554


In [11]:
g = gf.GraphFrame(vertex_df, edges_df)

In [25]:
result=g.connectedComponents()

In [26]:
result = result.persist()
result.count()

196913505

In [27]:
result.select(F.countDistinct('component')).show()

+-------------------------+
|count(DISTINCT component)|
+-------------------------+
|                189977291|
+-------------------------+



In [28]:
w = Window.partitionBy('component')
result\
    .withColumn('decomp', F.max('is_deanonymized').over(w))\
    .groupBy('decomp')\
    .agg(F.count(F.lit(1)).alias('count'),
         F.countDistinct('component'))\
    .show()

+------+---------+----------------+
|decomp|    count|count(component)|
+------+---------+----------------+
|     0|190955295|       189977290|
|     1|  5958210|               1|
+------+---------+----------------+



In [32]:
result\
    .withColumn('is_valid_component', F.max('is_deanonymized').over(w))\
    .filter(F.col('is_valid_component') == 1)\
    .show()

+--------------------+---------------+---------+------------------+
|                  id|is_deanonymized|component|is_valid_component|
+--------------------+---------------+---------+------------------+
|115iVZFSPFEW2Lt9M...|              0|        6|                 1|
|12AikdxgTuBeAgZaG...|              0|        6|                 1|
|12Fa8H4XJiGYzJZVQ...|              0|        6|                 1|
|12TNcbwrCtL8F3Rog...|              0|        6|                 1|
|12Uu5M2uUzYhbw8W8...|              0|        6|                 1|
|12pS546ammzoApDEb...|              0|        6|                 1|
|13KdyfMwriWoua4Qm...|              1|        6|                 1|
|13LkLVxU6xnZoUMWB...|              1|        6|                 1|
|13Y6BYhGJFeDruP3m...|              0|        6|                 1|
|13YBn5Bu8jmw8bDh6...|              0|        6|                 1|
|13sUpDwHoTjUYXuGZ...|              0|        6|                 1|
|13trL8dPUVXFSE8qf...|              0|        6|

In [31]:
result\
    .withColumn('is_valid_component', F.max('is_deanonymized').over(w))\
    .filter(F.col('is_valid_component') == 1)\
    .select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|              367248|
+--------------------+



In [33]:
1

1

In [21]:
transactions_df.select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|             3287946|
+--------------------+



In [20]:
filtered_df.select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|             3287946|
+--------------------+



In [19]:
vertex_df.select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|              367248|
+--------------------+



In [24]:
edges_df.select(F.sum('is_deanonymized')).show()

+--------------------+
|sum(is_deanonymized)|
+--------------------+
|             3287946|
+--------------------+

