In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!pip install graphviz

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=6930c08de99cda03d8a32c4983775417e7a772f98787a9f4093d0f55c777adaf
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pandas as pd
import numpy as np
import pyspark
import requests
import csv
import graphviz
import pprint
import codecs
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from google.colab import drive

In [3]:
GDRIVE_MOUNT = "/content/gdrive/"

drive.mount(GDRIVE_MOUNT, force_remount=True)
spark = SparkSession.builder.getOrCreate()

Mounted at /content/gdrive/


In [4]:
block_height = 0
response = requests.get('https://blockchain.info/block-height/{}'.format(block_height))
pprint.PrettyPrinter().pprint(response.json())

{'blocks': [{'bits': 486604799,
             'block_index': 0,
             'fee': 0,
             'hash': '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f',
             'height': 0,
             'main_chain': True,
             'mrkl_root': '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b',
             'n_tx': 1,
             'next_block': ['00000000839a8e6886ab5951d76f411475428afc90947ee320161bbf18eb6048'],
             'nonce': 2083236893,
             'prev_block': '0000000000000000000000000000000000000000000000000000000000000000',
             'size': 285,
             'time': 1231006505,
             'tx': [{'block_height': 0,
                     'block_index': 0,
                     'double_spend': False,
                     'fee': 0,
                     'hash': '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b',
                     'inputs': [{'index': 0,
                                 'prev_out': {'n': 4294967295,
    

In [5]:
string = response.json()['blocks'][0]['tx'][0]['inputs'][0]['script']
codecs.decode(string, "hex")

b'\x04\xff\xff\x00\x1d\x01\x04EThe Times 03/Jan/2009 Chancellor on brink of second bailout for banks'

In [6]:
def create_dataset(start_block, end_block, dir):

    v_columns = ['id', 'name', 'block_height', 'block_hash', 'fee', 'n_input', 'amount_input', 'n_output', 'amount_output']
    e_columns = ['src', 'dst', 'src_position', 'dst_position', 'address', 'value']

    with open(dir + 'vertices-{}-{}.csv'.format(start_block, end_block), 'w', encoding='UTF8') as v_file:
        with open(dir + 'edges-{}-{}.csv'.format(start_block, end_block), 'w', encoding='UTF8') as e_file:

            csv.writer(v_file).writerow(v_columns)
            csv.writer(e_file).writerow(e_columns)

            count_special = 0

            for block_height in range(start_block, end_block+1):

                response = requests.get('https://blockchain.info/block-height/{}'.format(block_height))
                block_hash = response.json()['blocks'][0]['hash']

                for tx in response.json()['blocks'][0]['tx']:

                    tx_index = tx['tx_index']
                    tx_hash = tx['hash']
                    fee = tx['fee']
                    n_input = 0
                    amount_input = 0
                    n_output = 0
                    amount_output = 0

                    for incoming_edge in tx['inputs']:

                        src_tx_index = incoming_edge['prev_out']['tx_index']
                        src_position = incoming_edge['prev_out']['n']
                        if src_tx_index == 0:
                            address = 'coinbase'
                        else:
                            try:
                                address = incoming_edge['prev_out']['addr']
                            except KeyError:
                                address = 'special' + str(count_special)
                                count_special += 1
                        value = incoming_edge['prev_out']['value']
                        dst_index = tx_index
                        dst_position = incoming_edge['index']
                        n_input += 1
                        amount_input += value

                        if src_tx_index != 0:
                            csv.writer(e_file).writerow([src_tx_index, tx_index, src_position, dst_position, address, value])

                    for outgoing_edge in tx['out']:
                        
                        src_tx_index = tx_index
                        src_position = outgoing_edge['n']
                        try:
                            address = outgoing_edge['addr']
                        except KeyError:
                            address = 'special' + str(count_special)
                            count_special += 1
                        value = outgoing_edge['value']
                        if outgoing_edge['spending_outpoints'] == []:
                            dst_tx_index = -1 #'unspent'
                            dst_position = -1 #'unspent'
                        else:
                            dst_tx_index = outgoing_edge['spending_outpoints'][0]['tx_index']
                            dst_position = outgoing_edge['spending_outpoints'][0]['n']
                        n_output += 1
                        amount_output += value

                        if dst_tx_index != -1:
                            csv.writer(e_file).writerow([src_tx_index, tx_index, src_position, dst_position, address, value])

                    csv.writer(v_file).writerow([tx_index, tx_hash, block_height, block_hash, fee, n_input, amount_input, n_output, amount_output])

    v_df = pd.read_csv(dir + 'vertices-{}-{}.csv'.format(start_block, end_block))
    e_df = pd.read_csv(dir + 'edges-{}-{}.csv'.format(start_block, end_block))

    new_txs_indices = pd.concat([e_df['src'], e_df['dst'], v_df['id'], v_df['id']]).drop_duplicates(keep=False)

    with open(dir + 'vertices-{}-{}.csv'.format(start_block, end_block), 'a', encoding='UTF8') as v_file:

        for i, tx_index in new_txs_indices.iteritems():
            csv.writer(v_file).writerow([tx_index, 'unknown', -1, 'unknown', -1, -1, -1, -1, -1])
    return

In [7]:
start_block = 100000
end_block = 105000
GDRIVE_DIR = GDRIVE_MOUNT + "MyDrive/Big Data Project/dataset/"

#create_dataset(start_block, end_block, GDRIVE_DIR)

VERTICES_DATASET_PATH = GDRIVE_DIR + "vertices-{}-{}.csv".format(start_block, end_block)
EDGES_DATASET_PATH = GDRIVE_DIR + "edges-{}-{}.csv".format(start_block, end_block)

In [8]:
v_df = spark.read.load(VERTICES_DATASET_PATH, 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="true"
                         )

In [9]:
e_df = spark.read.load(EDGES_DATASET_PATH, 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="true"
                         )

In [10]:
print("The shape of the vertices dataset is {:d} rows by {:d} columns".format(v_df.count(), len(v_df.columns)))
print("The shape of the edges dataset is {:d} rows by {:d} columns".format(e_df.count(), len(e_df.columns)))

The shape of the vertices dataset is 34453 rows by 9 columns
The shape of the edges dataset is 83174 rows by 6 columns


In [11]:
v_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- block_height: integer (nullable = true)
 |-- block_hash: string (nullable = true)
 |-- fee: integer (nullable = true)
 |-- n_input: integer (nullable = true)
 |-- amount_input: long (nullable = true)
 |-- n_output: integer (nullable = true)
 |-- amount_output: long (nullable = true)



In [12]:
e_df.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- src_position: integer (nullable = true)
 |-- dst_position: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- value: long (nullable = true)



In [13]:
v_df.show(20, truncate=False)

+----------------+----------------------------------------------------------------+------------+----------------------------------------------------------------+---+-------+------------+--------+-------------+
|id              |name                                                            |block_height|block_hash                                                      |fee|n_input|amount_input|n_output|amount_output|
+----------------+----------------------------------------------------------------+------------+----------------------------------------------------------------+---+-------+------------+--------+-------------+
|4764983090866501|8c14f0db3df150123e6f3dbbf30f8b955a8249b62ac1d1ff16284aefa3d06d87|100000      |000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506|0  |1      |0           |1       |5000000000   |
|6896493393222575|fff2525b8931402dd09222c50775608f75787bd2b87e56995a7bdd30f79702c4|100000      |000000000003ba27aa200b1cecaad478d2b00432346c3f1f3986da1afd33e506

In [14]:
e_df.show(20, truncate=False)

+----------------+----------------+------------+------------+----------------------------------+-----------+
|src             |dst             |src_position|dst_position|address                           |value      |
+----------------+----------------+------------+------------+----------------------------------+-----------+
|4764983090866501|4764983090866501|0           |12          |1HWqMzw1jfpXb3xyuUZ4uWXY4tqL2cW47J|5000000000 |
|111905863111945 |6896493393222575|0           |0           |1BNwxHGaFbeUBitpjy2AsKpJ29Ybxntqvb|5000000000 |
|6896493393222575|6896493393222575|0           |6           |1JqDybm2nWTENrHvMyafbSXXtTk5Uv5QAn|556000000  |
|6896493393222575|6896493393222575|1           |0           |1EYTGtG4LnFfiMvjJdsU7GMGCQvsRSjYhx|4444000000 |
|6869576823660862|6911274329242193|1           |0           |15vScfMHNrXN4QvWe54q5hwfVoYwG79CS1|300000000  |
|6911274329242193|6911274329242193|0           |0           |1H8ANdafjpqYntniT3Ddxh4xPBMCSz33pj|1000000    |
|6911274329242193|6

In [15]:
graph = graphviz.Digraph()

for  i, row in e_df.toPandas().iterrows():
    graph.node(str(row.src))
    graph.node(str(row.dst))
    graph.edge(str(row.src),str(row.dst))

In [16]:
graph.render(directory=GDRIVE_DIR) 

KeyboardInterrupt: ignored