# Realtime blockchain monitoring

In [None]:
import json
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.transforms import trigger
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.io.textio import WriteToText
import google.auth
import datetime

project_id = "forrest-test-project-333203"
instance_id = "my-bigtable"
table_id = "eth_address_meta"
dataflow_gcs_location = 'gs://forrest-bigdata-bucket/dataflow'

In [None]:
# Setting up the Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the PubSub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

In [None]:
ib.options.recording_duration = '2m'

## ETH address enrichment

In [23]:
class EnrichAddressMetaFn(beam.DoFn):
    def __init__(self, project_id, instance_id, table_id):
        self.project_id = project_id
        self.instance_id = instance_id
        self.table_id = table_id
    
    def start_bundle(self):
        from google.cloud import bigtable
        
        client = bigtable.Client(project=self.project_id)
        instance = client.instance(self.instance_id)
        self.table = instance.table(self.table_id)
    
    def process(self, elem):
        
        from google.cloud.bigtable import row
        import json

        def add_prefix(data, prefix):
            return {f"{prefix}{k}": v for k, v in data.items()}
        
        column_family_id = "cf1"
        column_id = "meta".encode()
        row_key_from = elem["from_address"].encode()
        row_key_to = elem["to_address"].encode()
        row_from = self.table.read_row(row_key_from)
        row_to = self.table.read_row(row_key_to)
        
        default_json = {
            "name": None,
            "account_type": None,
            "contract_type": None,
            "entity": None,
            "label": None,
            "tags": [],
            "created_at": None
        }
        value_from = json.loads(row_from.cells[column_family_id][column_id][0].value.decode('utf-8')) if row_from else default_json
        value_to = json.loads(row_to.cells[column_family_id][column_id][0].value.decode('utf-8')) if row_to else default_json

        elem.pop("nonce")
        elem.pop("input")
        
        elem = {**elem, "from_address_meta": value_from, "to_address_meta": value_to}

        yield json.dumps(elem)

In [None]:
# uncomment the following code for interactive beam
# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)
# subscription = "projects/forrest-test-project-333203/subscriptions/ethTransactionTest"

# uncomment the following code for dataflow runner
p = beam.Pipeline(options=options)
subscription = "projects/forrest-test-project-333203/subscriptions/ethTransactionParser"

target_topic = "projects/forrest-test-project-333203/topics/blockchain.eth.transactions_enriched"

enrich_pipeline = (
    p
    | "read from pubsub" >> beam.io.ReadFromPubSub(subscription=subscription)
    | 'transform to json' >> beam.Map(json.loads)
    | 'enrich with address meta' >> beam.ParDo(EnrichAddressMetaFn(project_id, instance_id, table_id))
    | 'UTF-8 encode' >> beam.Map(lambda s: s.encode("utf-8"))
    | 'write to pubsub' >> beam.io.WriteToPubSub(topic=output_topic)
)

In [None]:
ib.show(enrich_pipeline, include_window_info=True)

In [None]:
from apache_beam.runners import DataflowRunner
runner = DataflowRunner()
runner.run_pipeline(p, options=options)

# Export from big query to bigtable

In [None]:
# class for buiding bigtable row
class CreateRowFn(beam.DoFn):
    def process(self, elem):
        
        from google.cloud.bigtable import row
        import datetime
        import json

        row_key = elem.pop("address")
        elem["created_at"] = int(datetime.datetime.timestamp(elem["created_at"]))
        
        direct_row = row.DirectRow(row_key=row_key)
        direct_row.set_cell(
            'cf1',
            'meta',
            json.dumps(elem).encode())
        
        yield direct_row

In [None]:
# class for print row
class JsonToStringFn(beam.DoFn):
    def process(self, elem):
        import datetime
        import json

        row_key = elem.pop("address")
        elem["created_at"] = int(datetime.datetime.timestamp(elem["created_at"]))
        
        yield json.dumps(elem)

In [None]:
options = pipeline_options.PipelineOptions()
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

p = beam.Pipeline(options=options)

export_pipeline = (
    p
    | "read from bigquery" >> beam.io.ReadFromBigQuery(
        query='select * from forrest-test-project-333203.crypto.eth_address_tag', 
        use_standard_sql=True)
    # | "json to string" >> beam.ParDo(JsonToStringFn())
    # | "print" >> beam.Map(print)
    | "create bigtable row" >> beam.ParDo(CreateRowFn())
    | 'Write to bigtable' >> WriteToBigTable(
        project_id=project_id,
        instance_id=instance_id,
        table_id=table_id)
)

In [None]:
p.run()

In [None]:
from apache_beam.runners import DataflowRunner

options.view_as(GoogleCloudOptions).region = 'us-central1'
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
options.view_as(GoogleCloudOptions).job_name = f"export-to-bigquery-job-{int(datetime.datetime.utcnow().timestamp())}"
options.view_as(GoogleCloudOptions).service_account_email = "notebook@forrest-test-project-333203.iam.gserviceaccount.com"
options.view_as(GoogleCloudOptions).update = False
options.view_as(pipeline_options.WorkerOptions).network = "bigdata-network"
options.view_as(pipeline_options.WorkerOptions).subnetwork = "regions/us-central1/subnetworks/dataflow-network"
options.view_as(pipeline_options.WorkerOptions).use_public_ips = False
options.view_as(pipeline_options.SetupOptions).save_main_session = False

runner = DataflowRunner()
runner.run_pipeline(p, options=options)

In [None]:
%%bigquery --use_rest_api
select * from forrest-test-project-333203.crypto.eth_address_tag limit 10