# TOC

## Load imports

### Non pyspark

In [248]:
import time
import numpy as np
import pandas as pd
from fastparquet import ParquetFile, write
import matplotlib.pyplot as plt
import timeit
import dask.dataframe as dd
import holoviews as hv
import datashader as ds
import gc
from os.path import join, dirname
import pyarrow 
import pyarrow.parquet as pq
# PLOT USING HOLOVIEWS DASK AND DATASHADER
import hvplot.pandas
import hvplot.dask
import hvplot as hv
from bokeh.models import HoverTool
from pdb import set_trace
from datetime import datetime, timedelta
import csv
import dateutil.relativedelta

%matplotlib inline

### Pyspark

In [249]:
#UTILS 
import findspark
findspark.init('/usr/local/spark/spark-2.3.2-bin-hadoop2.7')

import pyspark
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as f
from pyspark.sql.types import DateType, StringType, IntegerType
from pyspark.sql import *
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName("Poolminers") \
    .getOrCreate()

sc=SparkContext.getOrCreate(spark)

## UTILS

In [250]:
t0 = time.time()
# EXPLODE THE TRANSACTION_LIST COLUMN IN AIONV4.BLOCK
def explode_block(df1,col):
    # explode the list the first time
    df1 = df1.withColumn(col,explode(split(f.col(col),'\],\[') ))
    # extract the transaction_hash
    df1 = df1.withColumn(col,regexp_replace('transaction_list', '(\[|\]|")', ''))
    df1 = df1.withColumn(col, df1[col].substr(0, 64))
    return df1

# munge block dataframe
def hex_to_int(x):
    return int(x,16)

def munge_block(df1):
    df1 = explode_block(df1,'transaction_list')
    udf_hex_to_int = udf(hex_to_int,IntegerType())
    df1 = df1.withColumn('difficulty',udf_hex_to_int('difficulty'))
    return df1


# MAKE LIST OF TIER 1 MINERS
def make_tier1_list(df,threshold_tx_paid_out=10,threshold_blocks_mined_per_day=2.5):
    # find all miners in period and make list
    miner_list = [i.miner_address for i in df.select('miner_address').distinct().collect()]
    # Count transactions paid out per day: group transactions by date and miner
    # tier 1 = percentage mined per day > threshold || transactions paid out > threshold per day# make unique list of tier 1
    df_temp = df.groupby('from_addr','block_timestamp').agg({'to_addr':'count'})
    df_temp = df_temp.dropna()
    # find daily mean
    df_temp = df_temp.groupby('from_addr').agg({'count(to_addr)':'mean'})
    df_temp = df_temp.filter(df_temp['avg(count(to_addr))']>=threshold_tx_paid_out)
    # make list of tier 1 using tx paid out
    list_a = [i.from_addr for i in df_temp.select('from_addr').distinct().collect()]
    # check against miner list to ensure that only miners are included
    list_a = list(set(miner_list) & set(list_a))
    df_temp.unpersist()
    
    # Get percentage blocks mined per day: group by miner address, day and count
    df_temp = df.groupby('miner_address','block_timestamp')\
        .agg({'block_number':'count'})\
        .withColumn('percent',100*(col('count(block_number)')/
                                   sum(col('count(block_number)')).over(Window.partitionBy())))
    df_temp = df_temp.groupby('miner_address').agg({'percent':'mean'})
    df_temp = df_temp.filter(df_temp['avg(percent)']>=threshold_blocks_mined_per_day)
    list_b = [i.miner_address for i in df_temp.select('miner_address').distinct().collect()]
    df_temp.unpersist()
    print(list_a)
    print(list_b)
    #merge lists, drop duplicates
    tier1_miner_list = list(set(list_a+list_b))
    del list_a,list_b

    #check this list again miner_address
    gc.collect()
    return tier1_miner_list

# dateformat = 'yyyy-mm-dd 00:00:00'
# CHANGE INDIVIDUAL DATE TO TIMESTAMP
def date_to_timestamp(date):
    return datetime.strptime(date, "%Y-%m-%d %H:%M:%S").timestamp()

# CREATE TIMESTAMP COLUMN IN DATETYPE FORMAT GIVEN A SPARK DATAFRAME
def timestamp_to_date(df,col):
    return df.withColumn('block_timestamp', f.from_unixtime('block_timestamp').cast(DateType()))

# TRUNCATE SPARK DATAFRAME GIVEN STRING DATES
# dateformat = 'yyyy-mm-dd 00:00:00'
def truncate_dataframe(df1,startdate,enddate):
    # get a month of data prior to startdate
    
    startdate = date_to_timestamp(startdate)# get a month of data prior to startdate
    startdate1 = startdate - ( 30 * 24 * 60 * 60)
    enddate = date_to_timestamp(enddate)
    if startdate > enddate:
        startdate = enddate
    df1 = df1.filter((f.col('block_timestamp') >= startdate1) & 
              (f.col('block_timestamp') <= enddate))
    df1 = timestamp_to_date(df1,'block_timestamp')
    return df1

# UDF FUNCTIONS TO INCLUDE EXTERNAL 
class MyUDFs:
    #DICTIONARY WHEN MATCHING POOLNAME WITH MINER ADDRESS    
    def populate(self):
        self.df_poolinfo = pd.read_csv('../data/poolinfo.csv')
        self.dict_poolinfo = dict(zip(self.df_poolinfo.address,self.df_poolinfo.poolname))
        self.pool_keys = list(self.dict_poolinfo.keys())
        
    def get_poolname_label(self):
        def ab(miner_address,pool_tier):
            if miner_address in self.pool_keys:
                return self.dict_poolinfo[miner_address]
            else:
                if pool_tier == 1:
                    return miner_address[0:10]
                else:
                    return 'tier 2'
        return udf(ab,StringType())
    
    def get_poolname_label_list(self,lst):
        output = list()
        for miner_address in lst:
            if miner_address in self.pool_keys:
                output.append(self.dict_poolinfo[miner_address])
        
        return output
                




## Load data

In [251]:
# LOAD FROM DATABASE
df_tx = spark.read.parquet('../data/transaction.parquet')  
df_block = spark.read.parquet('../data/block.parquet')


##  Make data warehouse for period (start_date, enddate)

#### ENTER INPUT DATES

In [252]:
startdate = '2018-09-01 00:00:00'
enddate = '2018-09-07 00:00:00'
analysis_period = startdate[0:10]+' to '+enddate[0:10]+': '

In [253]:
# truncate dataframes
df_block_1 = truncate_dataframe(df_block,startdate, enddate).drop('__index_level_0__')
df_block_1 = munge_block(df_block_1)

df_tx_1 = truncate_dataframe(df_tx,startdate,enddate).drop('block_timestamp')
#
df_tx_1 = df_tx_1.drop('__index_level_0__')
df = df_block_1.join(df_tx_1, df_block_1.transaction_list == 
                                    df_tx_1.transaction_hash,how='left').drop('transaction_list')
df_tx.unpersist()
df_tx_1.unpersist()
df_block.unpersist()
df_block_1.unpersist()
gc.collect()

187

### Identify tier 1 addresses

In [254]:
threshold_tx_paid_out = 10
threshold_blocks_mined_per_day = 2.5 # Percentage
tier1_miner_list = make_tier1_list(df,threshold_tx_paid_out,threshold_blocks_mined_per_day)
# ADD A COLUMN CALLED POOL_TIER: 1 , 2
pool_tier_udf = f.udf(lambda miner_address: 1 if 
                      miner_address in tier1_miner_list else 2, IntegerType())
df = df.withColumn('pool_tier',pool_tier_udf(df.miner_address))

['a099688bb19051b38c846580600812d095f89cfff7abc17ab6c4af63e408f2f1', 'a0afea235b391cc31bf4a26d6bb5513639c718941a47418bd6d4cf4f957e84f9', 'a0e342ff441c781e46cc9047cc60d8d7dd389bc33cd9ca6a560942d99adc4abb', 'a0d1565b5b1056942421b2473d25cc2ad2e14d1ed358a908f9a2475fff26d616', 'a0ed62bc7f308fa712792515d840f3a086fe36d9959207a468e2ca70ec503b89', 'a08fbc834b450044e750b90b2a43fec15ba03fd0c20fd5a3463f9f394235db5d', 'a07a07e8965418ed2355ed5b062dd9ec29a099578d7330c206666abcb0b9aab9', 'a05801fb2de5b76568a9c13f86766ebe3e6c438272e9fc6f5fabfcbea93048bf', 'a01e968221584946dac1e6842bf985de7ff49c8b9913220ed793d64a150b2864', 'a07981da70ce919e1db5f051c3c386eb526e6ce8b9e2bfd56e3f3d754b0a17f3', 'a04c126f9d2eba02c926210189c7bccf7ad86105dbca1c4d910cf2b692ac7dd5', 'a04372c5aaab61f0bc79500ba49043918851ab37887792977eff06319e976108', 'a08091ab0325e384ac45e560d2f85e4b741363aa98881d52d54233a02b33fcaa', 'a0e5150f600d56372b434465b204baec77b8ea44a8a30c7eabaeb9fe5844dc88', 'a023fae1b60e6cd37a3e5ca11331549ba9e8029618fb63

### Label pools in dataframe

In [255]:
myUDF  = MyUDFs()
myUDF.populate() 
df = df.withColumn('poolname',myUDF.get_poolname_label()(df["miner_address"],df["pool_tier"]))               
    

## Analysis & plots

### Bar graphs of Tier blocks mined over period 

In [256]:
def plot_miners(df,startdate):
    # only plot requested period
    startdate = datetime.strptime(startdate, "%Y-%m-%d %H:%M:%S")
    df_temp = df.filter(f.col('block_timestamp')>= startdate)
    
    df_temp = df_temp.groupby('poolname').agg({'block_number':'count'})\
        .withColumn('percent',100*(col('count(block_number)')/
                                   sum(col('count(block_number)')).over(Window.partitionBy())))
    # convert small group to pandas for plotting
    df_temp = df_temp.toPandas().sort_values(by=['percent'],ascending=False)
    # Leave out the datashade option to get the tooltip to work
    
    bar = df_temp.hvplot.bar('poolname', ['count(block_number)'], rot=90,
                             subplots=True, shared_axes=False,
                             width=800,height=400,
                             title=analysis_period +'Miners, blockcount')
    bar_perc = df_temp.hvplot.bar('poolname', ['percent'], rot=90,
                                  subplots=True, shared_axes=False,
                                  width=800,height=400,
                                  title=analysis_period +'Miners by %')

    
    hover = HoverTool(tooltips=[
        ("blocks mined", "$count(block_number)"),
        ("percentage", "$percent")
    ])

    #plot.options(tools=[hover])
    # display plot
    hv.show(bar)
    hv.show(bar_perc)
    del df_temp
    gc.collect()


### Difficulty

In [257]:
def plot_difficulty(df,startdate):
    # only plot requested period
    startdate = datetime.strptime(startdate, "%Y-%m-%d %H:%M:%S")
    df_temp = df.filter(f.col('block_timestamp')>= startdate)

    df_temp = df_temp.select('block_timestamp','difficulty')
    #convert from string to int
    df_temp = df_temp.toPandas().sort_values(by=['block_timestamp'])
    line = df_temp.hvplot.line(x='block_timestamp',y='difficulty',rot=90,
                               width=800,height=400,
                               title=analysis_period +'Difficulty')
    hv.show(line)
    del df_temp
    gc.collect()
  

### Daily Activity Miners

In [258]:
def plot_active_miners(df,startdate):
    # only plot requested period
    startdate = datetime.strptime(startdate, "%Y-%m-%d %H:%M:%S")
    df_temp = df.filter(f.col('block_timestamp')>= startdate)
    df_temp = df_temp.groupby('poolname','block_timestamp').agg({'block_number':'count'})
    df_temp = df_temp.toPandas().sort_values(by=['block_timestamp'])
    lines = df_temp.hvplot.line(x='block_timestamp',y='count(block_number)',rot=90,
                                by='poolname',width=800,height=600,
                                title=analysis_period+'pools blocks mined daily')
    hv.show(lines)
    del df_temp
    gc.collect()


### Retention

In [259]:
# filter data from previous month, excluding timespan under observation
startdate = datetime.strptime(startdate, "%Y-%m-%d %H:%M:%S")
# get tier 1 miner list for previous period
df_prev = df.filter(f.col('block_timestamp')<startdate)
tier1_miner_list_prev = make_tier1_list(df_prev,threshold_tx_paid_out,
                                         threshold_blocks_mined_per_day)

# get tier 1 miner list for period under observation
tier1_miner_list_period = tier1_miner_list
#reuse the myUDF class defined in utils
myUDF = MyUDFs()
myUDF.populate()

# POOLS RETAINED = INTERSECTION OF TWO T1 MINER LISTS
retained = list(set(tier1_miner_list_prev) & set(tier1_miner_list_period))
# POOLS DROPPED = IN LIST PREVIOUS BUT NOT IN THE NEW LIST
dropped = np.setdiff1d(tier1_miner_list_prev,tier1_miner_list_period)
# NEW POOLS = IN PERIOD UNDER OBSERVATION LIST BUT NOT IN PREVIOUS  MONTH LIST
new = np.setdiff1d(tier1_miner_list_period,tier1_miner_list_prev)

print("T1 POOLS RETAINED: ".format(myUDF.get_poolname_label_list(retained)))
print("T1 POOLS DROPPED: ".format(myUDF.get_poolname_label_list(dropped)))
print("NEW T1 POOLS: ".format(myUDF.get_poolname_label_list(new)))
print("\n---------------------------------------------")
print("ALL MINERS RETAINED:".format(retained))
print("ALL MINERS DROPPED:".format(dropped))
print("ALL NEW MINERS:".format())

df_prev.unpersist()
gc.collect()

['a0afea235b391cc31bf4a26d6bb5513639c718941a47418bd6d4cf4f957e84f9', 'a0e342ff441c781e46cc9047cc60d8d7dd389bc33cd9ca6a560942d99adc4abb', 'a0d1565b5b1056942421b2473d25cc2ad2e14d1ed358a908f9a2475fff26d616', 'a0ed62bc7f308fa712792515d840f3a086fe36d9959207a468e2ca70ec503b89', 'a08fbc834b450044e750b90b2a43fec15ba03fd0c20fd5a3463f9f394235db5d', 'a07a07e8965418ed2355ed5b062dd9ec29a099578d7330c206666abcb0b9aab9', 'a05801fb2de5b76568a9c13f86766ebe3e6c438272e9fc6f5fabfcbea93048bf', 'a01e968221584946dac1e6842bf985de7ff49c8b9913220ed793d64a150b2864', 'a07981da70ce919e1db5f051c3c386eb526e6ce8b9e2bfd56e3f3d754b0a17f3', 'a04c126f9d2eba02c926210189c7bccf7ad86105dbca1c4d910cf2b692ac7dd5', 'a04372c5aaab61f0bc79500ba49043918851ab37887792977eff06319e976108', 'a08091ab0325e384ac45e560d2f85e4b741363aa98881d52d54233a02b33fcaa', 'a0e5150f600d56372b434465b204baec77b8ea44a8a30c7eabaeb9fe5844dc88', 'a023fae1b60e6cd37a3e5ca11331549ba9e8029618fb635ed29ad377b9d911f8', 'a0c5da302ca0aa2d6ab5dc7cc7fe024f1b7f1902e7a8eb

BdbQuit: 

### DISPLAY PLOTS/DATA

In [None]:
plot_miners(df,startdate)
plot_difficulty(df,startdate)
plot_active_miners(df,startdate)

In [None]:
t1 = time.time()
total = t1 - t0
print('time elaped = {} mins'.format(total/60))