In [1]:
# Manage parameters, ex. date, table name ...

start_date = "2022-12-14"
end_date = "2022-12-21"

dataset_id = "transaction_network"
table_name = "graph_week_12_21"
edge_table_name = table_name + "_edge"
node_table_name = table_name + "_node"
sub_node_table_name = node_table_name + "_sub"
sub_edge_table_name = edge_table_name + "_sub"

temp_view_name = "temp_data"
edge_temp_view_name = "edge_temp_data"

In [2]:
# Connect to BigQuery
from google.cloud import bigquery
client = bigquery.Client()

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from collections import Counter

In [3]:
# Configure Spark and GraphFrames

import findspark, pyspark, os, sys
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, SQLContext

SUBMIT_ARGS = "--packages graphframes:graphframes:0.8.2-spark3.1-s_2.12 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

conf = SparkConf().setAll([('spark.jars', 'gs://spark-lib/bigquery/spark-3.1-bigquery-0.27.1-preview.jar')])
sc = SparkContext(conf=conf)

pyfiles = str(sc.getConf().get(u'spark.submit.pyFiles')).split(',')
sys.path.extend(pyfiles)

sqlContext = SQLContext(sparkContext=sc)
spark = sqlContext.sparkSession

bucket = "bd6893_data_yq"
spark.conf.set('temporaryGcsBucket', bucket)

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-925083e0-7798-46f0-982c-b90494bcd7be;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.1-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 182ms :: artifacts dl 4ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.1-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-----------------------------------------------

In [4]:
# Prepare a reference to a new dataset for storing the query results.
dataset_id_full = f"{client.project}.{dataset_id}"
dataset = bigquery.Dataset(dataset_id_full)

# Create the new BigQuery dataset.
dataset = client.create_dataset(dataset)

# Configure the query job.
job_config = bigquery.QueryJobConfig()
job_config.destination = f"{dataset_id_full}.{table_name}"

# Execute the query
post_merge_query = f"""
    SELECT * FROM bigquery-public-data.crypto_ethereum.transactions
    WHERE DATE(block_timestamp) >= "{start_date}" AND DATE(block_timestamp) < "{end_date}"
    AND (to_address) IS NOT NULL
    AND (gas_price) IS NOT NULL
"""
post_merge = client.query(post_merge_query, job_config=job_config)
post_merge.result()

<google.cloud.bigquery.table.RowIterator at 0x7ff292feaf10>

In [5]:
# Get query data, and create view

temp_data = spark.read.format('bigquery') \
    .option('table', f'big-data-6893-yunjie-qian:{dataset_id}.{table_name}') \
    .load()

temp_data.createOrReplaceTempView(temp_view_name)

In [6]:
# Aggregate edge attributes

edge_query = f'''
SELECT from_address AS src, to_address AS dst,
SUM(value) AS total_value, MIN(gas_price) AS min_gas_price, COUNT(input) AS transaction_count
FROM {temp_view_name} 
GROUP BY from_address, to_address
'''
edge_df = spark.sql(edge_query)
edge_df.createOrReplaceTempView(edge_temp_view_name)
# edge_df.show(3)
edge_df.printSchema()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- total_value: decimal(38,9) (nullable = true)
 |-- min_gas_price: long (nullable = true)
 |-- transaction_count: long (nullable = false)



In [7]:
# Write edge data to BigQuery table

edge_df.write.format('bigquery') \
  .option('table', f'{dataset_id}.{edge_table_name}') \
  .save()

                                                                                

In [8]:
# Aggregate node degrees and other node attributes

node_query_src = f'''
SELECT src AS id, COUNT(src) AS outdegree, 
SUM(total_value) AS out_total_value, SUM(transaction_count) AS out_total_transaction
FROM {edge_temp_view_name}
GROUP BY src
'''
node_df_src = spark.sql(node_query_src)

node_query_dst = f'''
SELECT dst AS id, COUNT(dst) AS indegree, 
SUM(total_value) AS in_total_value, SUM(transaction_count) AS in_total_transaction
FROM {edge_temp_view_name}
GROUP BY dst
'''
node_df_dst = spark.sql(node_query_dst)

In [9]:
# Preprocess node data

node_df = node_df_src.join(node_df_dst, on="id", how="full")
node_df = node_df.na.fill(value=0)
node_df = node_df.withColumn('degree', node_df.indegree + node_df.outdegree)
# node_df.show(3)
node_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- outdegree: long (nullable = true)
 |-- out_total_value: decimal(38,9) (nullable = true)
 |-- out_total_transaction: long (nullable = true)
 |-- indegree: long (nullable = true)
 |-- in_total_value: decimal(38,9) (nullable = true)
 |-- in_total_transaction: long (nullable = true)
 |-- degree: long (nullable = true)



In [10]:
# Write node data to BigQuery table

node_df.write.format('bigquery') \
  .option('table', f'{dataset_id}.{node_table_name}') \
  .save()

                                                                                

In [11]:
# Create Graph

from graphframes import *
g = GraphFrame(node_df, edge_df)

In [12]:
# Filter to get a subgraph

subg = g.filterVertices("degree >= 30").filterEdges("transaction_count >= 10").dropIsolatedVertices()

In [13]:
# Write the subgraph edges and nodes to BigQuery

subg.vertices.write.format('bigquery') \
  .option('table', f'{dataset_id}.{sub_node_table_name}') \
  .save()

subg.edges.write.format('bigquery') \
  .option('table', f'{dataset_id}.{sub_edge_table_name}') \
  .save()

                                                                                