In [1]:
import pymongo
from pymongo import MongoClient
import pyspark
from pyspark.sql.types import  *
from pyspark.sql import SparkSession

In [2]:
client = MongoClient("172.23.149.210", 27017)
db = client['cardano_silver']
db2 = client['cardano_bronze']

### Create MongoDB Collections for Address Update

In [3]:
# import the required collections with the last checkpoint
tx_out = db2["node.public.tx_out"]
addr_last_ind = db2["last_index_address"]

# import the required temporary collection to overwrite it with new data
addr_tmp = db2["address_temporary"]

In [4]:
# retrieve the last indices that were processed before
addr_last_processed = addr_last_ind.find_one({'collection': 'address'})['last_index']

In [5]:
# count how many documents are in each new input mongodb collection
count_addr = tx_out.estimated_document_count()

In [7]:
# select the records which haven't been processed yet (range between addr_last_processed and total records count)
addr_df = tx_out.find()[addr_last_processed:count_addr]

In [8]:
# drop the previous records in the temporary collections
addr_tmp.drop()

In [9]:
# load the temporary records in the temporary collections
addr_tmp.insert_many(addr_df)

<pymongo.results.InsertManyResult at 0x7f3b35b96710>

### Start Spark Session

In [23]:
config = pyspark.SparkConf().setAll([
    ('spark.executor.memory', '30g'), 
    ('spark.executor.cores', '5'), # number of cores to use on each executor
    ('spark.cores.max', '15'), # the maximum amount of CPU cores to request for the application from across the cluster
    ('spark.driver.memory','20g'),
    ('spark.driver.maxResultSize', '4g'),
    ('spark.executor.instances', '3'),
    ('spark.worker.cleanup.enabled', 'true'),
    ('spark.worker.cleanup.interval', '60'),
    ('spark.worker.cleanup.appDataTtl', '60'),
    ('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.2'),
    ('spark.mongodb.output.writeConcern.wTimeoutMS','1000000'),
    ('spark.mongodb.output.writeConcern.socketTimeoutMS','1000000'),
    ('spark.mongodb.output.writeConcern.connectTimeoutMS','1000000'),
    ("neo4j.url", "bolt://172.23.149.210:7687"),
    ("neo4j.authentication.type", "basic"),
    ("neo4j.authentication.basic.username", "neo4j"),
    ("neo4j.authentication.basic.password", "cardano")
])

In [24]:
spark = SparkSession \
    .builder \
    .config(conf=config) \
    .appName("Neo4j-Stream-Address-Update") \
    .master("spark://172.23.149.210:7077") \
    .getOrCreate()

23/01/28 14:46:30 WARN Utils: Your hostname, cardano-druid resolves to a loopback address: 127.0.0.1; using 172.23.149.210 instead (on interface ens3)
23/01/28 14:46:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.mongodb.spark#mongo-spark-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0fa4ac8d-b357-446b-a713-f1911de37cf5;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector;10.0.2 in central
	found org.mongodb#mongodb-driver-sync;4.5.1 in central
	[4.5.1] org.mongodb#mongodb-driver-sync;[4.5.0,4.5.99)
	found org.mongodb#bson;4.5.1 in central
	found org.mongodb#mongodb-driver-core;4.5.1 in central
:: resolution report :: resolve 2512ms :: artifacts dl 7ms
	:: modules in use:
	org.mongodb#bson;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-core;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-sync;4.5.1 from central in [default]
	org.mongodb.spark#mongo-spark-connector;10.0.2 from central in [default]
	---------------------------------------------------------------------
	|                  |  

#### Define schema for the different collections

In [25]:
schema = StructType([ \
    StructField("address", StringType(), True) \
])

In [26]:
schema2 = StructType([ \
    StructField("address", StringType(), True), \
    StructField("id", IntegerType(), True) \
])

In [27]:
schema3 = StructType([ \
    StructField("input_addr", StringType(), True), \
    StructField("output_addr", StringType(), True), \
    StructField("tx_hash", StringType(), True), \
    StructField("input_ADA_value", IntegerType(), True), \
    StructField("output_ADA_value", IntegerType(), True), \
])

In [28]:
schema4 = StructType([ \
    StructField("hash", StringType(), True), \
    StructField("id", IntegerType(), True) \
])

### Address Update

In [29]:
temp_addresses = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_bronze') \
  	.option('spark.mongodb.collection', 'address_temporary') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema) \
  	.load()

In [30]:
temp_addresses.createOrReplaceTempView("temp_addresses")

In [31]:
temp_addresses2 = spark.sql("SELECT DISTINCT address FROM temp_addresses")

In [32]:
temp_addresses2.write.format("mongodb") \
   .option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
   .mode("append") \
   .option('spark.mongodb.database', 'cardano_bronze') \
   .option('spark.mongodb.collection', 'address_temporary_2') \
   .option("forceDeleteTempCheckpointLocation", "true") \
   .save()

23/01/28 14:46:42 WARN CaseInsensitiveStringMap: Converting duplicated key forcedeletetempcheckpointlocation into CaseInsensitiveStringMap.
                                                                                

In [33]:
temp_addresses = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_bronze') \
  	.option('spark.mongodb.collection', 'address_temporary_2') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema) \
  	.load()

In [34]:
addresses = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_silver') \
  	.option('spark.mongodb.collection', 'addresses') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema) \
  	.load()

In [35]:
addresses.createOrReplaceTempView("addresses")
temp_addresses.createOrReplaceTempView("temp_addresses")

In [36]:
new_addresses = spark.sql("SELECT address FROM temp_addresses WHERE NOT EXISTS (SELECT address FROM addresses)")

In [37]:
new_addresses.write.format("mongodb") \
   .option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
   .mode("append") \
   .option('spark.mongodb.database', 'cardano_bronze') \
   .option('spark.mongodb.collection', 'new_addresses_temporary') \
   .option("forceDeleteTempCheckpointLocation", "true") \
   .save()

23/01/28 14:46:49 WARN CaseInsensitiveStringMap: Converting duplicated key forcedeletetempcheckpointlocation into CaseInsensitiveStringMap.
                                                                                

In [38]:
# find first address id for new addresses collection
addr = db["addresses"]
count = addr.estimated_document_count()

In [39]:
# create ids for the new addresses
collection = db2.new_addresses_temporary.find()
for doc in collection:
    update = {'$set': {"id": count}}
    db2.new_addresses_temporary.update_one(doc, update)
    count += 1

In [40]:
# read the new addresses with their ids
final_new_addresses = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_bronze') \
  	.option('spark.mongodb.collection', 'new_addresses_temporary') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema2) \
  	.load()

In [41]:
# append new unique addresses to the existing collection of addresses
final_new_addresses.write.format("mongodb") \
   .option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
   .mode("append") \
   .option('spark.mongodb.database', 'cardano_silver') \
   .option('spark.mongodb.collection', 'addresses') \
   .option("forceDeleteTempCheckpointLocation", "true") \
   .save()

23/01/28 14:48:21 WARN CaseInsensitiveStringMap: Converting duplicated key forcedeletetempcheckpointlocation into CaseInsensitiveStringMap.
23/01/28 14:48:21 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition.
23/01/28 14:48:21 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition.


In [42]:
# stream new addresses to neo4j
final_new_addresses.write.format("org.neo4j.spark.DataSource") \
  .mode("append") \
  .option("labels", "Address") \
  .option("node.keys", "id") \
  .save()

23/01/28 14:48:21 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition.
23/01/28 14:48:21 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition.
                                                                                

In [43]:
# drop temporary collection
db2.address_temporary_2.drop()
db2.new_addresses_temporary.drop()

In [44]:
# update checkpoints
addr_query = {"collection": "address"}
new_count = {"$set":{"last_index": count_addr}}
addr_last_ind.update_one(addr_query, new_count)

<pymongo.results.UpdateResult at 0x7f210c1bdb10>

In [None]:
spark.stop()

###  Create MongoDB Collections for Networks Update

In [None]:
# import required collections
tx = db["transaction_network"]
tx_last_ind = db2["last_index_neo4j_stream"]

# import required temporary collections to overwrite with new data
tx_tmp = db2["neo4j_stream_temporary"]

In [None]:
#retrieve the last indices that were processed before
tx_last_processed = tx_last_ind.find_one({'collection': 'neo4j_stream'})['last_index']

In [None]:
# count how many documents are in each new input mongodb collection
count_tx = tx.estimated_document_count()

In [None]:
# for each Cardano table, select the records which haven't been processed yet (range between last_processed and total records count)
tx_df = tx.find()[tx_last_processed:count_tx]

In [None]:
# drop the previous records in the temporary collections
tx_tmp.drop()

In [None]:
# load the temporary records in the temporary collections
tx_tmp.insert_many(tx_df)

### Start Spark Session


In [None]:
config = pyspark.SparkConf().setAll([
    ('spark.executor.memory', '30g'), 
    ('spark.executor.cores', '5'), # number of cores to use on each executor
    ('spark.cores.max', '15'), # the maximum amount of CPU cores to request for the application from across the cluster
    ('spark.driver.memory','20g'),
    ('spark.driver.maxResultSize', '4g'),
    ('spark.executor.instances', '3'),
    ('spark.worker.cleanup.enabled', 'true'),
    ('spark.worker.cleanup.interval', '60'),
    ('spark.worker.cleanup.appDataTtl', '60'),
    ('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.2'),
    ('spark.mongodb.output.writeConcern.wTimeoutMS','1000000'),
    ('spark.mongodb.output.writeConcern.socketTimeoutMS','1000000'),
    ('spark.mongodb.output.writeConcern.connectTimeoutMS','1000000'),
    ("neo4j.url", "bolt://172.23.149.210:7687"),
    ("neo4j.authentication.type", "basic"),
    ("neo4j.authentication.basic.username", "neo4j"),
    ("neo4j.authentication.basic.password", "cardano")
])

In [None]:
spark = SparkSession \
    .builder \
    .config(conf=config) \
    .appName("Neo4j-Stream-Networks-Update") \
    .master("spark://172.23.149.210:7077") \
    .getOrCreate()

#### Define schema for the different collections

In [None]:
schema = StructType([ \
    StructField("address", StringType(), True) \
])

In [None]:
schema2 = StructType([ \
    StructField("address", StringType(), True), \
    StructField("id", IntegerType(), True) \
])

In [None]:
schema3 = StructType([ \
    StructField("input_addr", StringType(), True), \
    StructField("output_addr", StringType(), True), \
    StructField("tx_hash", StringType(), True), \
    StructField("input_ADA_value", IntegerType(), True), \
    StructField("output_ADA_value", IntegerType(), True), \
])

In [None]:
schema4 = StructType([ \
    StructField("hash", StringType(), True), \
    StructField("id", IntegerType(), True) \
])

### Transaction Network & Clusters Update

In [45]:
TxNetwork = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_bronze') \
  	.option('spark.mongodb.collection', 'neo4j_stream_temporary') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema3) \
  	.load()

In [46]:
TxNetwork.createOrReplaceTempView("TxNetwork")

In [47]:
tx = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_bronze') \
  	.option('spark.mongodb.collection', 'node.public.tx') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema4) \
  	.load()

In [48]:
tx.createOrReplaceTempView("tx")

In [49]:
transactions_id = spark.sql("SELECT tx_hash, id as tx_id, input_addr, output_addr, input_ADA_value, output_ADA_value FROM TxNetwork LEFT JOIN tx on TxNetwork.tx_hash = tx.hash")

In [50]:
transactions_id.createOrReplaceTempView("transactions_id")

In [51]:
addresses = spark.read.format("mongodb") \
	.option('spark.mongodb.connection.uri', 'mongodb://172.23.149.210:27017') \
  	.option('spark.mongodb.database', 'cardano_silver') \
  	.option('spark.mongodb.collection', 'addresses') \
	.option('spark.mongodb.read.readPreference.name', 'primaryPreferred') \
	.option('spark.mongodb.change.stream.publish.full.document.only','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \
    .schema(schema2) \
  	.load()

In [52]:
addresses.createOrReplaceTempView("addresses")

In [53]:
input_addresses = spark.sql("SELECT transactions_id.tx_hash, transactions_id.tx_id, transactions_id.input_addr, transactions_id.output_addr, transactions_id.input_ADA_value, transactions_id.output_ADA_value, addresses.id as input_id FROM transactions_id LEFT JOIN addresses on transactions_id.input_addr = addresses.address")

In [54]:
input_addresses.createOrReplaceTempView("input_addresses")

In [55]:
TxNet = spark.sql("SELECT input_addresses.tx_hash, input_addresses.tx_id, input_addresses.input_addr, input_addresses.output_addr, input_addresses.input_ADA_value, input_addresses.output_ADA_value, input_addresses.input_id, addresses.id as output_id FROM input_addresses LEFT JOIN addresses on input_addresses.output_addr = addresses.address")

### Neo4j Tx Network & Clusters Streaming

In [None]:
# stream transaction network
TxNet.write.format("org.neo4j.spark.DataSource") \
  .mode("append") \
  .option("url", "bolt://172.23.149.210:7687") \
  .option("query","MATCH (a1:Address {id: event.input_id}) MATCH (a2:Address {id: event.output_id}) CREATE (a1)-[:TRANSACTED_WITH {tx_hash: event.tx_hash, tx_id: event.tx_id, input_ADA_value: event.input_ADA_value, output_ADA_value: event.output_ADA_value}]->(a2)") \
  .save()

In [57]:
TxNet.createOrReplaceTempView("TxNet")

In [58]:
clusters = spark.sql("SELECT t1.tx_id, t1.input_id as input1, t2.input_id as input2 FROM TxNet t1 LEFT JOIN TxNet t2 on t1.tx_id = t2.tx_id WHERE t1.input_id != t2.input_id")

In [59]:
clusters.createOrReplaceTempView("clusters")

In [60]:
clusters_final = spark.sql("SELECT distinct tx_id, input1, input2 FROM clusters")

In [None]:
# stream clusering network
clusters_final.write.format("org.neo4j.spark.DataSource") \
  .mode("append") \
  .option("url", "bolt://172.23.149.210:7687") \
  .option("query","MATCH (a1:Address {id: event.input1}) MATCH (a2:Address {id: event.input2}) CREATE (a1)-[:ASSOCIATED_WITH]->(a2)") \
  .save()

In [62]:
# update checkpoints
tx_query = {"collection": "neo4j_stream"}
new_count = {"$set":{"last_index": count_tx}}
tx_last_ind.update_one(tx_query, new_count)

<pymongo.results.UpdateResult at 0x7f210c1bb450>

In [63]:
spark.stop()