In [1]:
# Importing spark and dependencies
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
# Pointing to our Google Project credentials
credentials_location = '/home/fenniez/key/gcp-creds.json'

# Setting up spark cluster configurations
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/04/04 17:04:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Setting up spark cluster
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [4]:
# reading data as spark dataframes
df_blocks = spark.read.parquet('gs://blockchain-data-pipeline-bucket/blocks/2025/*/*')

                                                                                

In [7]:
df_transactions = spark.read.parquet('gs://blockchain-data-pipeline-bucket/transactions/2025/01/*')

                                                                                

In [8]:
# Reduce partitions to improve efficiency
df_transactions = df_transactions.repartition(50)  # Adjust partition count based on performance testing

In [5]:
df_blocks.count()

                                                                                

13243

In [9]:
df_transactions.count()

                                                                                

11581705

In [11]:
df_blocks.head(5)

[Row(hash='000000000000000000001e4d78f5c579cb010f0d8a9492cbee89a966df7d4134', size=350, stripped_size=314, weight=1292, number=890103, version=805306368, merkle_root='190febd0deafadcc1a681024ebc5a7068af309e74a3bb34c0b93734d54ee81d5', timestamp=datetime.datetime(2025, 3, 30, 10, 53, 38), timestamp_month=datetime.date(2025, 3, 1), nonce='c8891e07', bits='1702796c', coinbase_param='03f7940d04b222e967537069646572506f6f6c2f484b3130393130362ff955009b5c53ff1e8a14010000000000', transaction_count=1),
 Row(hash='000000000000000000023d80ea035a2cbf50406df889edb970df92ad2644e628', size=326, stripped_size=326, weight=1304, number=889406, version=561135616, merkle_root='d431e064660db5f6603aad25fd4fb8cc4413a41f3d06a06b3e364c1fa0c99913', timestamp=datetime.datetime(2025, 3, 25, 17, 51, 32), timestamp_month=datetime.date(2025, 3, 1), nonce='52c6a05a', bits='1702796c', coinbase_param='033e920d082f5669614254432f2cfabe6d6d004818462632b650640cb4da24fbfcf706aa4a0311653645917ca393c8ed6dd7100000000000000010ebd

In [12]:
df_transactions.head(1)

                                                                                

[Row(hash='b66d7772ba8d29fbb5e49b82b4594f4ce3cb562b1fb8f47cede109a6ed9ad548', size=222, virtual_size=141, version=2, lock_time=0, block_hash='000000000000000000026097dff69f4a68a173804d05de56775b49e99631e252', block_number=877481, block_timestamp=datetime.datetime(2025, 1, 2, 12, 21, 48), block_timestamp_month=datetime.date(2025, 1, 1), input_count=1, output_count=2, input_value=Decimal('222367.000000000'), output_value=Decimal('221885.000000000'), is_coinbase=False, fee=Decimal('482.000000000'), inputs=[Row(index=0, spent_transaction_hash='4a58e9a167c4be852bbca5bfb5b77f2ae27064ee4410358c6d780d9d0b7deeec', spent_output_index=0, script_asm='', script_hex='', sequence=4294967295, required_signatures=None, type='witness_v0_keyhash', addresses=['bc1qw5rkjyle5ge7768zs34krzlq5x9rfhs2dwm6f7'], value=Decimal('222367.000000000'))], outputs=[Row(index=0, script_asm='0 de39e936260f78a0461feb6c4bf03449a8ee4f91', script_hex='0014de39e936260f78a0461feb6c4bf03449a8ee4f91', required_signatures=None, ty

In [6]:
df_blocks.coalesce(1).write.parquet('blocks.parquet', mode='overwrite')

                                                                                

In [10]:
df_transactions.coalesce(1).write.parquet('transactions.parquet', mode='overwrite')

                                                                                