In [None]:
# start zookeeper
# ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

# start kafka
# ./bin/kafka-server-start ./etc/kafka/server.properties

# List topics
#./bin/kafka-topics --zookeeper localhost:2181 --list

# Delete a topic
#./bin/kafka-topics --zookeeper localhost:2181 --delete --topic write-to-neo4j

#https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

# start neo4j
# ./bin/neo4j restart

### Create Demo Neo4j Graph, with Neo4j BOLT Protocol Connector

In [18]:
#make sure apoc procedures are installed in Neo4j plugins folder

from neo4j.v1 import GraphDatabase, basic_auth, TRUST_ON_FIRST_USE, CypherError
from string import Template


nodes = 150000

nodes_per_graph = 5000

graphs = int(nodes/nodes_per_graph)

query0 = 'MATCH (n) DETACH DELETE n'


query1 = Template('CALL apoc.generate.ba( ${nodes_per_graph}, 1, "Cinema", "HAS_LOCATION") '
).substitute(locals())


query2 = '''
MATCH (c:Cinema) SET c.cinemaId = id(c)+1000000
;
'''
query3 = '''
CREATE CONSTRAINT ON (c:Cinema) ASSERT c.cinemaId IS UNIQUE
;
'''
query4 = '''
CREATE CONSTRAINT ON (a:Account) ASSERT a.accountId IS UNIQUE
;
'''

query5 = '''
CREATE INDEX on :DailyBoxOffice(accountId)
;    
'''

query6 = '''
MATCH (c:Cinema) 
WHERE NOT EXISTS ( (c)-[:HAS_ACCOUNT]->() )
CREATE (a:Account)<-[:HAS_ACCOUNT]-(c) SET a.accountId = c.cinemaId
;
'''


driver = GraphDatabase.driver("bolt://localhost",
                          auth=basic_auth("neo4j", "neo4j"),
                          encrypted=False,
                          trust=TRUST_ON_FIRST_USE)
try:
    
    session = driver.session()
    result = session.run(query0)
    summary = result.consume()
    print(summary.counters)
    session.close()
                 
    session = driver.session()
    for i in range(graphs):
        result = session.run(query1)
        summary = result.consume()
        #print(summary.counters)
    session.close()
    
    session = driver.session()
    result = session.run(query2)
    summary = result.consume()
    print(summary.counters)
    session.close()
    
    session = driver.session()
    result = session.run(query3)
    summary = result.consume()
    print(summary.counters)
    session.close()
    
    session = driver.session()
    result = session.run(query4)
    summary = result.consume()
    print(summary.counters)
    session.close()
    
    session = driver.session()
    result = session.run(query5)
    summary = result.consume()
    print(summary.counters)
    session.close()
    
    session = driver.session()
    result = session.run(query6)
    summary = result.consume()
    print(summary.counters)
    session.close()
    
    
except Exception as e:

    print('*** Got exception',e)
    if not isinstance(e, CypherError):
        print('*** Rolling back')
        session.rollback()
    else:
        print('*** Not rolling back')

finally:        
     print('*** Done!')

{}
{'properties_set': 150000}
{'constraints_added': 1}
{'constraints_added': 1}
{'indexes_added': 1}
{'relationships_created': 150000, 'nodes_created': 150000, 'labels_added': 150000, 'properties_set': 150000}
*** Done!


### Set Topic, Configure Messages, Timers

In [19]:
# Initializations.
import random
import time

# connect to Kafka
bootstrap_servers = 'localhost:9092' # change if your brokers live else where

kafka_topic = 'neo4j-150K-demo2'

msg_count = 150000

# this is the total number of messages that will be generated

# function to generate messages that will be the data for the graph update

# an example message is displayed : accountId, revenue, timestamp
# this simulates data from the source database

i=0
def generate_message(i):
    msg_payload = (str(i+1000000) + ',' + str(random.randrange(100000,1000000)/100) + ',' + str(time.time())).encode()
    return(msg_payload)

example_message = generate_message(i)
msg_bytes = len(generate_message(i))

print("Example message: " + str(example_message))
print("Message size (bytes): " + str(msg_bytes))


# we'll use a timer so you can see the throughput for both
# the producer and the consumer

# reset timer for kafka producer and consumer

producer_timings = {}
consumer_timings = {}



# function to calc throughput based on msg count and length

def calculate_thoughput(timing, n_messages=msg_count, msg_size=msg_bytes):
    print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
    print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
    print("{0:.2f} Msgs/s".format(n_messages / timing))

Example message: b'1000000,9611.06,1483655125.567612'
Message size (bytes): 33


### Kafka Message Producer using Confluent_Kafka Client

In [20]:
# kafka producer function, simulates ETL data stream for graph updates

from confluent_kafka import Producer, KafkaException, KafkaError
import random
import time


topic = kafka_topic

def confluent_kafka_producer_performance():
    
    # Note that you need to set producer buffer to at least as large as number of messages
    # otherwise you'll get a buffer overflow and the sequential messages will be corrupted
    conf = {'bootstrap.servers': bootstrap_servers, 
            'queue.buffering.max.messages': 200000
    }
    
    producer = confluent_kafka.Producer(**conf)
    i = 0
    messages_overflow = 0
    producer_start = time.time()
    for i in range(msg_count):
        msg_payload = generate_message(i)
        try:
            producer.produce(topic, value=msg_payload) 
        except BufferError as e:
            messages_overflow += 1

    # checking for overflow
    print('BufferErrors: ' + str(messages_overflow))

    producer.flush()
            
    return time.time() - producer_start

### Run Producer

In [21]:
producer_timings['confluent_kafka_producer'] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings['confluent_kafka_producer'])


BufferErrors: 0
Processed 150000 messsages in 1.56 seconds
3.03 MB/s
96150.94 Msgs/s


### Validate Produced Messages by Inspecting Offsets

In [22]:
from pykafka import KafkaClient

client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[kafka_topic.encode()]
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())

{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[150000], err=0)}


### Kafka Message Consumer using Confluent_Kafka, with Neo4j BOLT Protocol Connector

In [23]:
import confluent_kafka
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat
import uuid
from neo4j.v1 import GraphDatabase, basic_auth, TRUST_ON_FIRST_USE, CypherError
#import pandas as pd  #uncomment if you want to write messages to a file



def confluent_kafka_consume_batch(consumer, batch_size):

            batch_list = []
            
            batch_msg_consumed = 0

            for m in range(batch_size):

                msg = consumer.poll()

                if msg is None:
                    break
                    #continue

                if msg.error():
                    # Error or event
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        # Error
                        raise KafkaException(msg.error())  
                        
                else:

                    datastr = str(msg.value())
                    data = datastr[2:-1].split(",")
                    
                    # details you can access from message object
                    # print("%s %s" % ("iterator:", m))
                    # print("%s %s" % ("msg:", str(msg.value())))
                    # print("%s %s" % ("length:", len(msg)))
                    # print("%s %s" % ("data:", data))

                    batch_list.extend([data])
                    
                    batch_msg_consumed += 1
                        
            return(batch_list, batch_msg_consumed)

        

def confluent_kafka_consumer_performance():
    
    topic = kafka_topic
    msg_consumed_count = 0
    batch_size = 10000
    batch_list = []
    nodes = 0
    rels = 0 
    
    driver = GraphDatabase.driver("bolt://localhost",
                              auth=basic_auth("neo4j", "neo4j"),
                              encrypted=False,
                              trust=TRUST_ON_FIRST_USE)
    
    
    update_query = '''
    WITH  {batch_list} AS batch_list
    UNWIND batch_list AS rows
    WITH rows, toInteger(rows[0]) AS acctid
    MATCH (a:Account {accountId: acctid}) 
    MERGE (a)-[r:HAS_DAILY_REVENUE]->(n:DailyBoxOffice {accountId: toInteger(rows[0])})
    ON CREATE SET n.revenueUSD = toFloat(rows[1]), n.createdDate = toFloat(rows[2])
    '''
    
    conf = {'bootstrap.servers': bootstrap_servers,
            'group.id': uuid.uuid1(),
            'session.timeout.ms': 60000,
            'enable.auto.commit': 'true',
            'default.topic.config': {
                'auto.offset.reset': 'earliest'
            }
    }

    consumer = confluent_kafka.Consumer(**conf)

    consumer_start = time.time()
    
    def print_assignment (consumer, partitions):
        print('Assignment:', partitions)
    
    # Subscribe to topics
    consumer.subscribe([topic], on_assign=print_assignment)
   
    # consumer loop
    try:
        
        while True:
            
            # Neo4j Graph update loop using Bolt
            try:     
                
                session = driver.session()

                batch_list, batch_msg_consumed = confluent_kafka_consume_batch(consumer, batch_size)
                msg_consumed_count += batch_msg_consumed
                
                # if you want to see what your message batches look like
                # df = pd.DataFrame(batch_list)
                # filename='test_' + str(msg_consumed_count) + '.csv'
                # df.to_csv(path_or_buf= filename)

                # using the Bolt implicit transaction
                #result = session.run(update_query, {"batch_list": batch_list})
                
                # using the Bolt explicit transaction, recommended for writes
                with session.begin_transaction() as tx:
                    result = tx.run(update_query, {"batch_list": batch_list})
                    tx.success = True;
                    
                    summary = result.consume()
                    nodes = summary.counters.nodes_created
                    rels = summary.counters.relationships_created

                    print("%s %s %s %s" % ("Messages consumed:", msg_consumed_count , "Batch size:", len(batch_list)), end=" ")
                    print("%s %s %s %s" % ("Nodes created:", nodes, "Rels created:", rels))
                
                if msg_consumed_count >= msg_count:
                    break

            except Exception as e:

                print('*** Got exception',e)
                if not isinstance(e, CypherError):
                    print('*** Rolling back')
                    session.rollback()
                else:
                    print('*** Not rolling back')

            finally:        
                session.close()
                batch_msg_consumed_count = 0


    except KeyboardInterrupt:
            sys.stderr.write('%% Aborted by user\n')

    finally:
        consumer_timing = time.time() - consumer_start
        consumer.close()    
        return consumer_timing

### Run Consumer, Update Neo4j Graph in Batches

In [24]:
# run consumer throughput test
  
consumer_timings['confluent_kafka_consumer'] = confluent_kafka_consumer_performance()

calculate_thoughput(consumer_timings['confluent_kafka_consumer'])


Assignment: [TopicPartition{topic=neo4j-150K-demo2,partition=0,offset=-1001,error=None}]
Messages consumed: 10000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 20000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 30000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 40000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 50000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 60000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 70000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 80000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 90000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 100000 Batch size: 10000 Nodes created: 10000 Rels created: 10000
Messages consumed: 110000 Batch size: 10000 Nodes created: 10000 Rels 

### Cleanup, in case you want to run again

In [80]:
from neo4j.v1 import GraphDatabase, basic_auth, TRUST_ON_FIRST_USE, CypherError



driver = GraphDatabase.driver("bolt://localhost",
                          auth=basic_auth("neo4j", "neo4j"),
                          encrypted=False,
                          trust=TRUST_ON_FIRST_USE)

cleanup = '''
MATCH (n:DailyBoxOffice) DETACH DELETE n
'''

try:
    session = driver.session()
    result = session.run(cleanup)
    summary = result.consume()
    print(summary.counters)
    session.close()

except Exception as e:

    print('*** Got exception',e)
    if not isinstance(e, CypherError):
        print('*** Rolling back')
        session.rollback()
    else:
        print('*** Not rolling back')

finally:        
     print('*** Done!')

{'relationships_deleted': 150000, 'nodes_deleted': 150000}
*** Done!
