In [30]:
import findspark
findspark.init()

import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_utc_timestamp, from_unixtime, to_date
from pyspark import SparkContext
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.types import IntegerType,StructField,StructType, LongType
from pyspark.sql.functions import col, when

from pyspark.sql.functions import monotonically_increasing_id, coalesce, date_trunc
import snap
from os import path
import glob

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.memory", "100g") \
    .config("spark.cores.max", "10") \
    .getOrCreate()

In [3]:
sc = spark.sparkContext

In [20]:
#spark.read.load("/media/ssd2/bitcoinRemy/to_2011/co_spending_id/").show()
spark.read.load("/media/ssd2/bitcoinRemy/to_2011/co_spending_id").show()

+------+------+
|src_id|dst_id|
+------+------+
|  2432|  2433|
| 10963| 10963|
|  2494|  2991|
|  2991|  2771|
|  2771|  3387|
|  3387|  2991|
|  2991|  2991|
|  2991|  3444|
|     1|  2549|
|  2549|  8501|
|  8501|  2883|
|  2883| 11019|
|  4249|  9750|
|  9750|  8913|
|  9085|  3505|
|  3505|  4873|
|  6667|  5804|
|  5804|  4573|
|  7085|  7085|
|  7085|  7085|
+------+------+
only showing top 20 rows



In [4]:
from bitcoin_utils import *

In [56]:
def get_files_from_dir(a_dir,prefix="",suffix=""):
    if path.isfile(a_dir):
        return [a_dir]
    files = glob.glob(a_dir + '/**/'+prefix+'*'+suffix, recursive=True)
    return sorted(files)


def replace_column(df_original,df_dict,output_file,key_original,key_dict,renaming_new_col=False,replace=False):
    """
    replace column key_original in df df_original by column target_name of df_dict, renamed as new_name
    """
    if isinstance(df_original,str):
        df_original = spark.read.load(df_original)
    if isinstance(df_dict,str):
        df_dict = spark.read.load(df_dict)
        
    df_original=df_original.withColumnRenamed(key_original,"key1")
    df_dict=df_dict.withColumnRenamed(key_dict,"key2")
    joined = df_original.join(df_dict,df_original["key1"] ==  df_dict["key2"],how='left')
    if replace:
        joined = joined.drop("key1")
    else:
        joined = joined.withColumnRenamed("key1",key_original)
    joined=joined.drop("key2")
    if renaming_new_col!=False:
        joined=joined.withColumnRenamed(renaming_new_col[0],renaming_new_col[1])
    joined.write.save(output_file)

def _extract_in(x):
    to_return=[]
    inputs=json.loads(x.inputs)
    if len(inputs)>1:
        for i in range(len(inputs)-1):
            ads1=inputs[i][2]
            ads2=inputs[i+1][2]
            if len(ads1)==1:
                ads1=ads1[0]
            else:
                ads1=str(ads1)
            if len(ads2)==1:
                ads2=ads2[0]
            else:
                ads2=str(ads2)
            to_return.append((ads1,ads2))
        return(to_return)
    return([])
def _create_co_spending(current_dir,year="",month=""):
    print("computing co_spending")
    lines = spark.read.load("/media/ssd2/bitcoinRemy/whole_dataset/")
    if year!="":
        lines = lines.filter(lines.year<=year).filter(lines.month<=month)
    inputs = lines.select("inputs")
    to_write = inputs.rdd.flatMap(_extract_in).toDF(["src","dst"])#.collect()#
    return(to_write)
    

###create a list of unique addresses to have int IDs
def _to_uniq_vals(row):
    return [(row.src,),(row.dst,)]


    
    
def _dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

def _create_ad2id(co_spending):
    addresses =co_spending.rdd.flatMap(_to_uniq_vals).toDF(["ad"]).distinct()
    ad2id = _dfZipWithIndex(addresses,colName="id")       
    return ad2id


def _convert_co_spending_to_id(temp_dir,co_spending,ad2id,output_co_spending_file):
    replace_column(co_spending,
               ad2id,
               temp_dir+"/temp_co_spending",
               "src",
               "ad",
               ("id","src_id"),
               replace=True
              )
    replace_column(temp_dir+"/temp_co_spending",
               ad2id,
               output_co_spending_file,
               "dst",
               "ad",
               ("id","dst_id"),
               replace=True
              )
    

def _write_clusters(clusters,output_file,ID=True,ID2A=None):
    output_file=open(output_file, "w+")
    for i,CnCom in enumerate(clusters):
        for nodeID in CnCom:
            if ID:
                to_write=str(nodeID)
            else:
                to_write=str(ID2A[nodeID])
            output_file.write(to_write+"\t"+str(i)+"\n")
    output_file.close()
    
def _compute_with_snap(co_spend_id,id2cl_file):
    G0 = snap.LoadEdgeList(snap.TUNGraph, co_spend_id, 0,1)
    print("Number of Nodes: %d" % G0.GetNodes())
    print("Number of edges: %d" % G0.GetEdges())
    Components = G0.GetSccs()
    _write_clusters(Components,id2cl_file,True)

    
def _convert_clusters_from_id_to_addresses(ad2id,id2cl,output):
    replace_column(ad2id,
               id2cl,
               output,
               "id",
               "id",
               replace=True
              ) 
def compute_clusters(current_dir,year="",month=""):
    co_spending = _create_co_spending(current_dir,year,month)
    co_spending.write.save(current_dir+"/co_spending")
    
    co_spending=spark.read.load(current_dir+"/co_spending")
    ad2id = _create_ad2id(co_spending)
    ad2id.write.save(current_dir+"/ad2id")
    
    _convert_co_spending_to_id(current_dir,current_dir+"/co_spending",current_dir+"/ad2id",current_dir+"/co_spending_id")
    
    co_spend_id=spark.read.load(current_dir+"/co_spending_id")
    co_spend_id.coalesce(1).write.format("csv").option("header", "false").option("sep"," ").save(current_dir+"/co_spend_id.csv")
    
    co_spend_id_file = get_files_from_dir(current_dir+"/co_spend_id.csv","part")[0]
    
    _compute_with_snap(co_spend_id_file,current_dir+"/id2cl.ssv")
    spark.read.csv(current_dir+"/id2cl.ssv",sep="\t").toDF("id","cl").write.save(current_dir+"/id2cl")                    
    _convert_clusters_from_id_to_addresses(current_dir+"/ad2id",current_dir+"/id2cl",current_dir+"/ad2cl")                   
                     
                
          

In [None]:
def create_ad2identity(dir_temp,hints,ad2cl):
    replace_column(hints,
           ad2cl,
           dir_temp+"/cls2identities",
           "adresse",
           "ad",
           replace=False
          )

In [55]:
compute_clusters("/media/ssd2/bitcoinRemy/to_end")#,year=2011,month=1)

In [11]:
#lines = spark.read.csv("/media/ssd2/bitcoinRemy/matched/data_part1_matched/transactions_00250000_00259999.json",sep="\t")
#lines = spark.read.csv("/media/ssd2/bitcoinRemy/matched/*/*.json",sep="\t")


In [104]:
walletExplorer_hints = spark.read.csv("/media/DD2/labeledUsers/walletexplorer.csv",header=True).drop("page")
#    .write.save("/media/ssd2/bitcoinRemy/ad2identity")
create_ad2identity(walletExplorer_hints,)

In [None]:
# Convert files from the minimal format to parquet files
def create_initial_files():
    lines = spark.read.csv("/media/ssd2/bitcoinRemy/matched/*/*.json",sep="\t").toDF("hash","bloc","timestamp","fee","inputs","outputs")
    lines.withColumn("date", to_date(from_unixtime("timestamp"))) \
    .withColumn("year", year("date")).withColumn("month", month("date")).withColumn("day", dayofmonth("date"))\
    .write.partitionBy("year", "month","day").save("/media/ssd2/bitcoinRemy/whole_dataset")

In [None]:
# Create files ad2ad
createAd2Ad()

In [77]:
#lines.repartition(1000,"timestamp").write.bucketBy(1000, "timestamp").sortBy("timestamp").saveAsTable("transactions_by_timestamp")

In [144]:
compute_clusters("/media/ssd2/bitcoinRemy/to_2021/")

DataFrame[hash: string, bloc: string, timestamp: string, fee: string, inputs: string, outputs: string, date: date, year: int, month: int, day: int]

# create graph for snap

In [15]:
#ad2id = ad2id.withColumn("id", monotonically_increasing_id())


In [28]:
#co_spending_test = co_spending.limit(100)

In [None]:
#spark.read.load("/media/ssd2/bitcoinRemy/all_addresses").distinct().count()

In [None]:
### first join

In [10]:
ad2id=spark.read.load("/media/ssd2/bitcoinRemy/ad2id")

In [None]:
co_spending=spark.read.load("/media/ssd2/bitcoinRemy/co_spending__")

In [15]:
co_spending_joined = co_spending.join(ad2id.alias("b"),co_spending.src ==  ad2id.ad,how='left')

In [16]:
co_spending_joined.select(["dst","id"]).withColumnRenamed("id","id2").write.save("/media/ssd2/bitcoinRemy/co_spend_temp")

In [17]:
### second join

In [18]:
co_spending_joined=spark.read.load("/media/ssd2/bitcoinRemy/co_spend_temp")

In [19]:
co_spending_joined2 = co_spending_joined.join(ad2id.alias("c"),co_spending_joined.dst ==  ad2id.ad,how='left')

In [20]:
co_spending_joined2.select(["id","id2"]).write.save("/media/ssd2/bitcoinRemy/co_spend_id")

In [21]:
co_spending_joined2=spark.read.load("/media/ssd2/bitcoinRemy/co_spend_id")
co_spending_joined2.coalesce(1)\
   .write.format("csv")\
   .option("header", "false").option("sep"," ")\
   .save("/media/ssd2/bitcoinRemy/co_spend_id.csv")

In [22]:
#test =spark.read.load("/media/ssd2/bitcoinRemy/co_spend_id").toDF("src","dst")

In [23]:
#test2=test.rdd.flatMap(to_uniq_vals).toDF(["ad"])

In [24]:
#test2.distinct().count()

In [25]:
#co_spending_joined = co_spending_joined.join(ad2id.alias("c"),co_spending_test.src ==  ad2id.ad,how='left')

In [26]:
#test = co_spending_joined.select(col("b.id").alias("id1"),col("c.id").alias("id2"))

In [27]:
#co_spending = co_spending.join(ad2id,co_spending.ad ===  ad2id.ad,how='left')
#co_spending.select("src_id","dst_id").write.save("/media/ssd2/bitcoinRemy/co_spending_id")

### compute with snap

In [30]:
G0 = snap.LoadEdgeList(snap.TUNGraph, "/media/ssd2/bitcoinRemy/co_spend_id.csv/part-00000-85d18c15-3539-47f7-b5a1-006aa011b3f2-c000.csv", 0,1)

Number of Nodes: 483216193
Number of edges: 644216163


### Write clusters

In [132]:
#id2cl=spark.read.csv("/media/ssd2/bitcoinRemy/id2cl",sep="\t").toDF("id_n","cl")
#df = id2cl.join(ad2id,id2cl.id_n ==  ad2id.id_u,how='left')
#df.select(["ad","cl"]).write.save("/media/ssd2/bitcoinRemy/ad2cl")
replace_column("/media/ssd2/bitcoinRemy/to_2021/ad2id",
               "/media/ssd2/bitcoinRemy/to_2021/id2cl",
               "/media/ssd2/bitcoinRemy/to_2021/ad2cl",
               "id",
               "id",
               "cl",
               replace=True
              )

In [None]:
### create the cl2cl file ?

### Create the cl2identity file

In [60]:
#spark.read.csv("/media/DD2/labeledUsers/walletexplorer.csv",header=True)\
#    .drop("page")\
#    .write.save("/media/ssd2/bitcoinRemy/ad2identity")

In [146]:
spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cls2identities")\
    .na.drop().dropDuplicates(["service"]).sort("service").drop("adresse").withColumnRenamed("service","identity")\
    .write.save("/media/ssd2/bitcoinRemy/to_2021/cl2idenity")



### create ad2identity

In [168]:
replace_column("/media/ssd2/bitcoinRemy/to_2021/ad2cl",
               "/media/ssd2/bitcoinRemy/to_2021/cl2idenity",
               "/media/ssd2/bitcoinRemy/to_2021/ad2identity_temp",
               "cl",
               "cl",
               replace=False
              )

In [181]:
coalescing = spark.read.load("/media/ssd2/bitcoinRemy/to_2021/ad2identity_temp")
coalescing =coalescing.withColumn("identity",coalesce(coalescing.identity,coalescing.cl)) 
coalescing.drop("cl").write.save(current_dir+"ad2identity")

### create cl2cl

In [178]:
replace_column("/media/ssd2/bitcoinRemy/to_2021/tr_ad2ad",
               "/media/ssd2/bitcoinRemy/to_2021/ad2identity",
               "/media/ssd2/bitcoinRemy/to_2021/cl2cl_temp",
               "src",
               "ad",
               ("identity","src_identity"),
               replace=False
              )

In [182]:
replace_column("/media/ssd2/bitcoinRemy/to_2021/cl2cl_temp",
               "/media/ssd2/bitcoinRemy/to_2021/ad2identity",
               "/media/ssd2/bitcoinRemy/to_2021/cl2cl",
               "dst",
               "ad",
               ("identity","dst_identity"),
               replace=False
              )

In [184]:
coalescing = spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl_detailed")
coalescing =coalescing.withColumn("src_identity",coalesce(coalescing.src_identity,coalescing.src)) 
coalescing =coalescing.withColumn("dst_identity",coalesce(coalescing.dst_identity,coalescing.dst)) 
coalescing.drop("src").drop("dst").drop("cl").write.save(current_dir+"cl2cl")

In [185]:
spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl").show()

+---------+----------+--------------------+--------------------+
|    value|      time|        src_identity|        dst_identity|
+---------+----------+--------------------+--------------------+
|  1994582|1494969173|39JYTisRzC8m8CFtu...|             6042538|
|222078315|1520174706|            69460615|35QFbAGF3hjWEEAkN...|
|   682430|1603581836|1EhxPSn1LgXCjRwEj...|35QFga7ogoFRqSPYq...|
|   690300|1603341168|bc1q4vsqfj96e9jku...|35QFga7ogoFRqSPYq...|
|200117191|1601202621|3Q9hKECtkE7yWBsFi...|35QFgmDGxeaHTq4B2...|
|   589405|1568842803|            24327129|                 500|
|   980926|1549062281|                 682|                 682|
| 13456692|1536258642|3PY3rEtGCeXCeY4i7...|35QG4R9MK6tBGHdvV...|
|  1530000|1469663070|            60793105|                 817|
| 29118471|1541694641|3EJTgXC4T6zmKpREx...|35QG6tnatN1CU7tnf...|
|  1612733|1513957200|1HvuJVXqGNqaXmYyd...|             3324535|
| 17635964|1513649725|17VNNNNny2HP9E13T...|             3324535|
|  2950000|1609980311|   

In [192]:
dates =spark.read.csv("/media/ssd2/bitcoinRemy/btc_prices.csv",sep=",",header=True)

In [200]:
dates.select(["date","PriceUSD"]).show()

+----------+--------+
|      date|PriceUSD|
+----------+--------+
|2009-01-03|    null|
|2009-01-04|    null|
|2009-01-05|    null|
|2009-01-06|    null|
|2009-01-07|    null|
|2009-01-08|    null|
|2009-01-09|    null|
|2009-01-10|    null|
|2009-01-11|    null|
|2009-01-12|    null|
|2009-01-13|    null|
|2009-01-14|    null|
|2009-01-15|    null|
|2009-01-16|    null|
|2009-01-17|    null|
|2009-01-18|    null|
|2009-01-19|    null|
|2009-01-20|    null|
|2009-01-21|    null|
|2009-01-22|    null|
+----------+--------+
only showing top 20 rows



In [211]:
test = spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl").limit(100)

In [213]:
test.select(date_trunc("Day",to_date(from_unixtime("time","yyyy-MM-dd")))).show()

+-------------------------------------------------------------+
|date_trunc(Day, to_date(from_unixtime(`time`, 'yyyy-MM-dd')))|
+-------------------------------------------------------------+
|                                          2017-05-16 00:00:00|
|                                          2018-03-04 00:00:00|
|                                          2020-10-25 00:00:00|
|                                          2020-10-22 00:00:00|
|                                          2020-09-27 00:00:00|
|                                          2019-09-18 00:00:00|
|                                          2019-02-02 00:00:00|
|                                          2018-09-06 00:00:00|
|                                          2016-07-28 00:00:00|
|                                          2018-11-08 00:00:00|
|                                          2017-12-22 00:00:00|
|                                          2017-12-19 00:00:00|
|                                       

In [215]:
test.withColumn("date",from_unixtime("time","yyyy-MM-dd")).show()

+---------+----------+--------------------+--------------------+----------+
|    value|      time|        src_identity|        dst_identity|      date|
+---------+----------+--------------------+--------------------+----------+
|  1994582|1494969173|39JYTisRzC8m8CFtu...|             6042538|2017-05-16|
|222078315|1520174706|            69460615|35QFbAGF3hjWEEAkN...|2018-03-04|
|   682430|1603581836|1EhxPSn1LgXCjRwEj...|35QFga7ogoFRqSPYq...|2020-10-25|
|   690300|1603341168|bc1q4vsqfj96e9jku...|35QFga7ogoFRqSPYq...|2020-10-22|
|200117191|1601202621|3Q9hKECtkE7yWBsFi...|35QFgmDGxeaHTq4B2...|2020-09-27|
|   589405|1568842803|            24327129|                 500|2019-09-18|
|   980926|1549062281|                 682|                 682|2019-02-02|
| 13456692|1536258642|3PY3rEtGCeXCeY4i7...|35QG4R9MK6tBGHdvV...|2018-09-06|
|  1530000|1469663070|            60793105|                 817|2016-07-28|
| 29118471|1541694641|3EJTgXC4T6zmKpREx...|35QG6tnatN1CU7tnf...|2018-11-08|
|  1612733|1

In [216]:
replace_column(spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl").withColumn("date",from_unixtime("time","yyyy-MM-dd")),
               spark.read.csv("/media/ssd2/bitcoinRemy/btc_prices.csv",sep=",",header=True).select(["date","PriceUSD"]),
               "/media/ssd2/bitcoinRemy/to_2021/cl2cl_price",
               "date",
               "date",
               replace=False
              )

In [217]:
spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl_price").show()

+--------+----------+--------------------+------------+----------+----------------+
|   value|      time|        src_identity|dst_identity|      date|        PriceUSD|
+--------+----------+--------------------+------------+----------+----------------+
| 6978720|1483956499|               16380|       16380|2017-01-09|903.508666803039|
|26476520|1484583610|               16380|       16380|2017-01-16|830.838035067212|
| 3380704|1485595749|3CvaaQVhPxajAFYxQ...|       16380|2017-01-28|922.774948568089|
|37473008|1485605195|3CvaaQVhPxajAFYxQ...|       16380|2017-01-28|922.774948568089|
| 9492345|1485999286|3Do32f6fxJgkfYPiY...|       16380|2017-02-02| 1007.7043659848|
|58047903|1485935679|3FfSD5HuLJi7t2ZvP...|       16380|2017-02-01|987.532590414962|
|34874467|1484068913|3FqJGUsVHWh2Taiv5...|       16380|2017-01-10|906.498947399182|
|10396387|1484075961|3FqJGUsVHWh2Taiv5...|       16380|2017-01-10|906.498947399182|
|10400146|1484083659|3FqJGUsVHWh2Taiv5...|       16380|2017-01-10|906.498947

In [4]:
spark.read.load("/media/ssd2/bitcoinRemy/to_2021/cl2cl_price").withColumn("year", year("date")).withColumn("month", month("date")).withColumn("day", dayofmonth("date"))\
    .repartition("year", "month","day").drop("date").write.partitionBy("year", "month","day").save("/media/ssd2/bitcoinRemy/to_2021/time_split")

In [219]:
spark.read.load("/media/ssd2/bitcoinRemy/to_2021/time_split_save/").show()

+----------+----------+--------------------+--------------------+----------+----------------+----+-----+---+
|     value|      time|        src_identity|        dst_identity|      date|        PriceUSD|year|month|day|
+----------+----------+--------------------+--------------------+----------+----------------+----+-----+---+
|  18940000|1436324273|18myXyFoz9mZmEyfY...|1DPebXtSMsZ2Hz7oH...|2015-07-08|270.576142898889|2015|    7|  8|
|   2162766|1436359326|3K3vFvt6ogZoCdUWo...|              876777|2015-07-08|270.576142898889|2015|    7|  8|
| 108657180|1436344489|12fb47BVWMuZnSXxG...|1DQNxkjhoet9LHfbC...|2015-07-08|270.576142898889|2015|    7|  8|
| 428472233|1436383322|            68955964|        Bitstamp.net|2015-07-08|270.576142898889|2015|    7|  8|
|    116000|1436389427|13Z6QL2hzDJW7xn5N...|1DRFPc67dKAjsYxPN...|2015-07-08|270.576142898889|2015|    7|  8|
|       546|1436320713|3FJnkvu2wPbRFkdxH...|3AFnL1Vj5qtSH63S2...|2015-07-08|270.576142898889|2015|    7|  8|
|  38132000|1436342

In [None]:
from graphframes import *
sc.setCheckpointDir("/media/ssd2/tmp")
def create_co_spending(output_file,year="",month=""):
    #lines = spark.read.csv("/media/ssd2/bitcoinRemy/matched/data_part1_matched/transactions_00250000_00259999.json",sep="\t")
    #lines = spark.read.csv("/media/ssd2/bitcoinRemy/matched/*/*.json",sep="\t")
    print("computing co_spending")
    lines = spark.read.load("/media/ssd2/bitcoinRemy/whole_dataset/")
    if year!="":
        lines = lines.filter(lines.year<=year).filter(lines.month<=month)
    inputs = lines.select("inputs")
    test = inputs.rdd.flatMap(extract_in).toDF(["src","dst"])#.collect()#
    test.write.save("/media/ssd2/bitcoinRemy/co_spending_"+str(year)+"_"+str(month))
    CC_direct(test,output_file)
def CC_direct(df,output_file):
    print("computing CC")


    edgesDf=df.distinct()
    verticesDf=edgesDf \
     .select("src") \
     .union(edgesDf.select("dst")) \
     .distinct() \
     .withColumnRenamed('src', 'id')

    graph=GraphFrame(verticesDf,edgesDf)
    CC=graph.connectedComponents()#("graphx")
    CC.write.csv(output_file)
computeCC("/media/ssd2/bitcoinRemy/CC_all")

In [98]:
search=spark.read.load("/media/ssd2/bitcoinRemy/to_2021/co_spending__/")

In [99]:
search.show()

+--------------------+--------------------+
|                 src|                 dst|
+--------------------+--------------------+
|1JGK6a2XV6DGUtLqW...|1Q1a4nFf9d1dRBLhH...|
|1Q1a4nFf9d1dRBLhH...|13PDuxTEQVcSzCV75...|
|13PDuxTEQVcSzCV75...|1LeE9ZMREk55rRkzw...|
|1LeE9ZMREk55rRkzw...|1E6tEeAfHBKxGDQog...|
|1E6tEeAfHBKxGDQog...|1Q1a4nFf9d1dRBLhH...|
|1Q1a4nFf9d1dRBLhH...|19DxwonooHC1uXDge...|
|15ofj41FxqJhpnn6V...|1F22Br7NGp636QMMa...|
|1F22Br7NGp636QMMa...|1F22Br7NGp636QMMa...|
|1F22Br7NGp636QMMa...|1BM26fpYsdABP7LGP...|
|1K3LBchJoaSXN1KCD...|1EiQKL3hGNCxqcikw...|
|16uVJBEwgEcmVa37R...|1PM4gau767jm2KuFw...|
|1PM4gau767jm2KuFw...|1Lfvy8T1kMu8knmu5...|
|1Pg14Qgeg1bqri6yB...|186DWXvp9WYw6iyfm...|
|186DWXvp9WYw6iyfm...|1EDFUxqD7s2moa4mW...|
|1EDFUxqD7s2moa4mW...|1JFnopCRHNpTmUubs...|
|13xxf6R1vh7Xmegy4...|12KwdDvjo7zg9N9uw...|
|3Adq16vtCd2iMxkxp...|34G1ofv8EqwEQZ1pi...|
|1A8QR2mmgeREMwn3Z...|1KUTADsb149Y1EPVb...|
|1KUTADsb149Y1EPVb...|1GnoH1N2a25z4K9MA...|
|1PtABw4TFM7HD8q5E...|1KchXsg8oj

In [102]:
search.filter(search["src"] == "1zVHpyQV6ALDyQQDd8JMtwGKifADQ7RLE").show(truncate=False)

+---------------------------------+----------------------------------+
|src                              |dst                               |
+---------------------------------+----------------------------------+
|1zVHpyQV6ALDyQQDd8JMtwGKifADQ7RLE|1FeDtFhARLxjKUPPkQqEBL78tisenc9znS|
+---------------------------------+----------------------------------+



In [104]:
search=spark.read.load("/media/ssd2/bitcoinRemy/to_2021/ad2cl")
search.filter((col("ad") == "1zVHpyQV6ALDyQQDd8JMtwGKifADQ7RLE") | (col("ad") == "1FeDtFhARLxjKUPPkQqEBL78tisenc9znS")).show(truncate=False)


+---------+----------------------------------+------+
|key1     |ad                                |cl    |
+---------+----------------------------------+------+
|343534150|1FeDtFhARLxjKUPPkQqEBL78tisenc9znS|250059|
|482592975|1zVHpyQV6ALDyQQDd8JMtwGKifADQ7RLE |null  |
+---------+----------------------------------+------+



In [105]:
search=spark.read.load("/media/ssd2/bitcoinRemy/to_2021/id2cl")
search.filter((col("id") == "482592975") | (col("id")== "343534150")).show(truncate=False)

+---------+------+
|id       |cl    |
+---------+------+
|343534150|250059|
+---------+------+



In [110]:
search=spark.read.load("/media/ssd2/bitcoinRemy/to_2021/co_spend_id/")
search.filter(col("id2") == "482592975").show(truncate=False)

+---------+---------+
|id       |id2      |
+---------+---------+
|343534150|482592975|
+---------+---------+



In [118]:
search=spark.read.csv("/media/ssd2/bitcoinRemy/to_2021/co_spend_id.csv/",sep=" ").toDF("src","dst")
search.filter(col("src") == "482592975").show(truncate=False)

+---------+---------+
|src      |dst      |
+---------+---------+
|482592975|343534150|
+---------+---------+



In [122]:
search=spark.read.csv("/media/ssd2/bitcoinRemy/to_2021/id2cl.ssv",sep="\t").show()
#search.filter(col("_c0") == "482592975").show(truncate=False)

+---------+---+
|      _c0|_c1|
+---------+---+
|160919439|  0|
|276896091|  0|
| 15863686|  0|
|267230831|  0|
| 59392913|  0|
|421803123|  0|
|359018837|  0|
|354189328|  0|
|173001544|  0|
| 44884215|  0|
|264818081|  0|
|144002764|  0|
|397657143|  0|
|397657142|  0|
| 88405918|  0|
| 95908898|  0|
| 86237741|  0|
|226412158|  0|
|143262864|  0|
| 46028271|  0|
+---------+---+
only showing top 20 rows



In [121]:
search

483216194

In [None]:
pd.re