In [84]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from graphframes import *
from pyspark.sql.functions import col, asc, desc, expr, split, sum, count, size

spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.master', "local[*]"),
    # ('spark.master', "spark://spark-master:7077"),
    ('spark.app.name', 'bigdata-graphx'),
    ('spark.submit.deployMode', 'client'),
    ("spark.jars", "neo4j-connector-apache-spark_2.12-5.0.0_for_spark_3.jar"),
    ("spark.jars.packages", "graphframes:graphframes:0.8.1-spark3.0-s_2.12"),
])

spark = (
    SparkSession
    .builder
    .config(conf=spark_conf)
    .getOrCreate()
)

In [5]:
transactions_df = (
    spark.read
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://neo4j:7687")
    .option("query", "MATCH (t:Transaction) RETURN t.txid as id")
    .load()
)

depends_df = (
    spark.read
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://neo4j:7687")
    .option("query", "MATCH (src:Transaction) -[r:Depends]- (dst:Transaction) RETURN src.txid as src, dst.txid as dst")
    .load()
)

In [6]:
transactions_df.show()
depends_df.show()

+--------------------+
|                  id|
+--------------------+
|39704e617bfdfd221...|
|491efc402a77a1357...|
|bf77749b40ef1ea0a...|
|3b125da136a7d7bd3...|
|253507ece0a3288c0...|
|3c7c7497b5b29147d...|
|1d90e51bb3d874bdc...|
|b85d97ef3a6533338...|
|a8c940be35c23e9bb...|
|f090f5d02ebdd07a5...|
|d1815dd261c6f82cc...|
|c540bd1a2c935b369...|
|db14a0b2f1b4a93d7...|
|fe6f5d32629b91853...|
|481f7cc9e183919ec...|
|6613dca6ce1b4db3a...|
|45c53cfe6c7e7c0bb...|
|8ceeb21ce50c918fa...|
|25506175ac667075d...|
|7c1c73c9e6bf99606...|
+--------------------+
only showing top 20 rows

+--------------------+--------------------+
|                 src|                 dst|
+--------------------+--------------------+
|253507ece0a3288c0...|8b4c1dedec8261111...|
|253507ece0a3288c0...|8b4c1dedec8261111...|
|253507ece0a3288c0...|c1498dff2a424069c...|
|3c7c7497b5b29147d...|8dc053f141a2b4330...|
|3c7c7497b5b29147d...|8dc053f141a2b4330...|
|3c7c7497b5b29147d...|7c1c73c9e6bf99606...|
|3c7c7497b5b29147d...|475e

In [8]:
graph = GraphFrame(transactions_df, depends_df)
graph.degrees.sort(col('degree').desc()).show()



+--------------------+------+
|                  id|degree|
+--------------------+------+
|dddba850442bc1f57...|    58|
|09e4f33d2c096cddd...|    52|
|81312f4c21f97eeaa...|    52|
|945bb3e35cb88b204...|    42|
|b534ac542b6785757...|    40|
|c21c910fb499431ed...|    24|
|dac027764110d5007...|    24|
|b5d73bd817905afe9...|    20|
|4ed8f86075d677763...|    14|
|d506c987e46d6ac44...|    14|
|933509337ce88ee7c...|    14|
|d0a618f2b2c753a8e...|    14|
|581d457391b2ff4dc...|    14|
|f0eeb33e9b1e0c074...|    12|
|a75e25d6af6f6b71e...|    12|
|02a96836568b6e587...|    12|
|8dc053f141a2b4330...|    12|
|f00d2a8c1c2222592...|    12|
|7326ae68fa17cbd5e...|    12|
|8e01e8faed57e6a4c...|    12|
+--------------------+------+
only showing top 20 rows



In [86]:
#tx graph diameter
landmarks = graph.vertices.rdd.map(lambda x: x.id).collect()
transactions_paths = graph.shortestPaths(landmarks = landmarks)
(
    transactions_paths.select("id","distances")
    .transform(lambda tx: tx.withColumn("distanse_length", size(col("distances"))))
    .orderBy(col('distanse_length').desc())
    .show(truncate=True)
)

+--------------------+--------------------+---------------+
|                  id|           distances|distanse_length|
+--------------------+--------------------+---------------+
|a17891e701c872e63...|{e7d09c85437d92da...|            282|
|11bcf9c547597b50b...|{e7d09c85437d92da...|            282|
|253507ece0a3288c0...|{e7d09c85437d92da...|            282|
|7517d5ba7f46e5b2e...|{e7d09c85437d92da...|            282|
|1d90e51bb3d874bdc...|{e7d09c85437d92da...|            282|
|3c7c7497b5b29147d...|{e7d09c85437d92da...|            282|
|b85d97ef3a6533338...|{e7d09c85437d92da...|            282|
|481f7cc9e183919ec...|{e7d09c85437d92da...|            282|
|fe6f5d32629b91853...|{e7d09c85437d92da...|            282|
|7c1c73c9e6bf99606...|{e7d09c85437d92da...|            282|
|37941a7a4c54978dd...|{e7d09c85437d92da...|            282|
|bb3fc79cd7148d2bc...|{e7d09c85437d92da...|            282|
|f2e9991efb16c57a3...|{e7d09c85437d92da...|            282|
|9c212ab0fcd23db28...|{e7d09c85437d92da.

In [31]:
# depends_df.show()

maxUtxoCount = (
    depends_df
    .groupBy("src")
    .agg(count("dst").alias("utxos_count"))
    .orderBy(col("utxos_count").desc())
    .show()
)

maxOutputsCount = (
    depends_df
    .groupBy("dst")
    .agg(count("src").alias("outputs_count"))
    .orderBy(col("outputs_count").desc())
    .show()
)

+--------------------+-----------+
|                 src|utxos_count|
+--------------------+-----------+
|dddba850442bc1f57...|         29|
|09e4f33d2c096cddd...|         26|
|81312f4c21f97eeaa...|         26|
|945bb3e35cb88b204...|         21|
|b534ac542b6785757...|         20|
|dac027764110d5007...|         12|
|c21c910fb499431ed...|         12|
|b5d73bd817905afe9...|         10|
|4ed8f86075d677763...|          7|
|933509337ce88ee7c...|          7|
|d506c987e46d6ac44...|          7|
|d0a618f2b2c753a8e...|          7|
|581d457391b2ff4dc...|          7|
|8dc053f141a2b4330...|          6|
|02a96836568b6e587...|          6|
|8e01e8faed57e6a4c...|          6|
|3f8afc226c20e2cf5...|          6|
|8a788b6f4505dd361...|          6|
|f0eeb33e9b1e0c074...|          6|
|61763bb94c642fa2d...|          6|
+--------------------+-----------+
only showing top 20 rows

+--------------------+-------------+
|                 dst|outputs_count|
+--------------------+-------------+
|dddba850442bc1f57...| 

In [36]:
accounts_df = (
    spark.read
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://neo4j:7687")
    .option("query", "MATCH (t:Account) RETURN a.address as id")
    .load()
)

transactions_df = (
    spark.read
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://neo4j:7687")
    .option("query", "MATCH (src:Account) -[s:Send]-> (t:Transaction) -[r:Receive]-> (dst:Account) RETURN src.address as src, dst.address as dst, t.txid as txid")
    .load()
)

In [76]:
# maximum number of unique senders in 1 transaction

# transactions_df.show()

maxSenders = (
    transactions_df
    .dropDuplicates(["src", "txid"])
    .groupBy("txid")
    .agg(count("src").alias("senders_count"))
    .orderBy(col("senders_count").desc())
)

maxSenders.show(truncate=True)

maxRecipients = (
    transactions_df
    .dropDuplicates(["dst", "txid"])
    .groupBy("txid")
    .agg(count("dst").alias("recipients_count"))
    .orderBy(col("recipients_count").desc())
)

maxRecipients.show(truncate=True)

+--------------------+-------------+
|                txid|senders_count|
+--------------------+-------------+
|dddba850442bc1f57...|           26|
|81312f4c21f97eeaa...|           24|
|09e4f33d2c096cddd...|           22|
|945bb3e35cb88b204...|           20|
|68f40f4a9cf422dd9...|           17|
|a0e5bbc6c348b9256...|           17|
|4008d74217e2121f9...|           16|
|c21c910fb499431ed...|           13|
|dac027764110d5007...|           13|
|b534ac542b6785757...|           13|
|9f87948d2c89556d9...|           10|
|2ce2e39c18be37239...|            8|
|d6d2537e1a72a11f2...|            5|
|2b333194f53c5be4d...|            5|
|b76f80a4484345441...|            5|
|a5124297548d4675a...|            5|
|7020d5196c717b3cd...|            5|
|fbfa112d510e93607...|            4|
|1f7ee75cefac13cdf...|            4|
|9535487e8e34e5c15...|            4|
+--------------------+-------------+
only showing top 20 rows

+--------------------+----------------+
|                txid|recipients_count|
+-----