Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 94 additions & 59 deletions bulk_insert.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import csv
import os
import io
import struct
from timeit import default_timer as timer
import redis
import click
from backports import csv

# Global variables
CONFIGS = None # thresholds for batching Redis queries
Expand Down Expand Up @@ -55,6 +58,9 @@ def __init__(self, graphname, client):
self.labels = [] # List containing all pending Label objects
self.reltypes = [] # List containing all pending RelationType objects

self.nodes_created = 0 # Total number of nodes created
self.relations_created = 0 # Total number of relations created

# Send all pending inserts to Redis
def send_buffer(self):
# Do nothing if we have no entities
Expand All @@ -68,7 +74,9 @@ def send_buffer(self):
self.initial_query = False

result = self.client.execute_command("GRAPH.BULK", self.graphname, *args)
print(result)
stats = result.split(', '.encode())
self.nodes_created += int(stats[0].split(' '.encode())[0])
self.relations_created += int(stats[1].split(' '.encode())[0])

self.clear_buffer()

Expand All @@ -83,13 +91,17 @@ def clear_buffer(self):
del self.labels[:]
del self.reltypes[:]

def report_completion(self, runtime):
print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds"
% (self.graphname, self.nodes_created, self.relations_created, runtime))

# Superclass for label and relation CSV files
class EntityFile(object):
def __init__(self, filename):
# The label or relation type string is the basename of the file
self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode("ascii")
self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode('utf-8')
# Input file handling
self.infile = open(filename, 'rt')
self.infile = io.open(filename, 'rt', encoding='utf-8')
# Initialize CSV reader that ignores leading whitespace in each field
# and does not modify input quote characters
self.reader = csv.reader(self.infile, skipinitialspace=True, quoting=csv.QUOTE_NONE)
Expand All @@ -100,6 +112,17 @@ def __init__(self, filename):
self.packed_header = ""
self.binary_entities = []
self.binary_size = 0 # size of binary token
self.count_entities() # number of entities/row in file.

# Count number of rows in file.
def count_entities(self):
self.entities_count = 0
self.entities_count = sum(1 for line in self.infile)
# discard header row
self.entities_count -= 1
# seek back
self.infile.seek(0)
return self.entities_count

# Simple input validations for each row of a CSV file
def validate_row(self, expected_col_count, row):
Expand All @@ -119,9 +142,10 @@ def pack_header(self, header):
# String format
fmt = "=%dsI" % (len(self.entity_str) + 1) # Unaligned native, entity_string, count of properties
args = [self.entity_str, prop_count]
for prop in header[self.prop_offset:]:
for p in header[self.prop_offset:]:
prop = p.encode('utf-8')
fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator
args += [str.encode(prop)]
args.append(prop)
return struct.pack(fmt, *args)

# Convert a list of properties into a binary string
Expand Down Expand Up @@ -160,31 +184,35 @@ def process_entities(self, expected_col_count):
global TOP_NODE_ID
global QUERY_BUF

for row in self.reader:
self.validate_row(expected_col_count, row)
# Add identifier->ID pair to dictionary if we are building relations
if NODE_DICT is not None:
if row[0] in NODE_DICT:
print("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
% (row[0], self.infile.name, self.reader.line_num))
exit(1)
NODE_DICT[row[0]] = TOP_NODE_ID
TOP_NODE_ID += 1
row_binary = self.pack_props(row)
row_binary_len = len(row_binary)
# If the addition of this entity will make the binary token grow too large,
# send the buffer now.
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
QUERY_BUF.labels.append(self.to_binary())
QUERY_BUF.send_buffer()
self.reset_partial_binary()
# Push the label onto the query buffer again, as there are more entities to process.
QUERY_BUF.labels.append(self.to_binary())

QUERY_BUF.node_count += 1
self.binary_size += row_binary_len
self.binary_entities.append(row_binary)
QUERY_BUF.labels.append(self.to_binary())
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
for row in reader:
self.validate_row(expected_col_count, row)
# Add identifier->ID pair to dictionary if we are building relations
if NODE_DICT is not None:
if row[0] in NODE_DICT:
print("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
% (row[0], self.infile.name, self.reader.line_num))
exit(1)
NODE_DICT[row[0]] = TOP_NODE_ID
TOP_NODE_ID += 1
row_binary = self.pack_props(row)
row_binary_len = len(row_binary)
# If the addition of this entity will make the binary token grow too large,
# send the buffer now.
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
QUERY_BUF.labels.append(self.to_binary())
QUERY_BUF.send_buffer()
self.reset_partial_binary()
# Push the label onto the query buffer again, as there are more entities to process.
QUERY_BUF.labels.append(self.to_binary())

QUERY_BUF.node_count += 1
entities_created += 1
self.binary_size += row_binary_len
self.binary_entities.append(row_binary)
QUERY_BUF.labels.append(self.to_binary())
print("%d nodes created with label '%s'" % (entities_created, self.entity_str))

# Handler class for processing relation csv files.
class RelationType(EntityFile):
Expand All @@ -211,31 +239,34 @@ def process_header(self):
return expected_col_count

def process_entities(self, expected_col_count):
for row in self.reader:
self.validate_row(expected_col_count, row)

try:
src = NODE_DICT[row[0]]
dest = NODE_DICT[row[1]]
except KeyError as e:
print("Relationship specified a non-existent identifier.")
raise e
fmt = "=QQ" # 8-byte unsigned ints for src and dest
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
row_binary_len = len(row_binary)
# If the addition of this entity will make the binary token grow too large,
# send the buffer now.
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
QUERY_BUF.reltypes.append(self.to_binary())
QUERY_BUF.send_buffer()
self.reset_partial_binary()
# Push the reltype onto the query buffer again, as there are more entities to process.
QUERY_BUF.reltypes.append(self.to_binary())

QUERY_BUF.relation_count += 1
self.binary_size += row_binary_len
self.binary_entities.append(row_binary)
QUERY_BUF.reltypes.append(self.to_binary())
entities_created = 0
with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader:
for row in reader:
self.validate_row(expected_col_count, row)
try:
src = NODE_DICT[row[0]]
dest = NODE_DICT[row[1]]
except KeyError as e:
print("Relationship specified a non-existent identifier.")
raise e
fmt = "=QQ" # 8-byte unsigned ints for src and dest
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
row_binary_len = len(row_binary)
# If the addition of this entity will make the binary token grow too large,
# send the buffer now.
if self.binary_size + row_binary_len > CONFIGS.max_token_size:
QUERY_BUF.reltypes.append(self.to_binary())
QUERY_BUF.send_buffer()
self.reset_partial_binary()
# Push the reltype onto the query buffer again, as there are more entities to process.
QUERY_BUF.reltypes.append(self.to_binary())

QUERY_BUF.relation_count += 1
entities_created += 1
self.binary_size += row_binary_len
self.binary_entities.append(row_binary)
QUERY_BUF.reltypes.append(self.to_binary())
print("%d relations created for type '%s'" % (entities_created, self.entity_str))

# Convert a single CSV property field into a binary stream.
# Supported property types are string, numeric, boolean, and NULL.
Expand All @@ -261,8 +292,9 @@ def prop_to_binary(prop_str):

# If we've reached this point, the property is a string
# Encoding len+1 adds a null terminator to the string
format_str += "%ds" % (len(prop_str) + 1)
return struct.pack(format_str, Type.STRING, str.encode(prop_str))
encoded_str = prop_str.encode('utf-8')
format_str += "%ds" % (len(encoded_str) + 1)
return struct.pack(format_str, Type.STRING, encoded_str)

# For each node input file, validate contents and convert to binary format.
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
Expand Down Expand Up @@ -293,7 +325,7 @@ def process_entity_csvs(cls, csvs):
@click.option('--relations', '-r', multiple=True, help='Path to relation csv file')
# Buffer size restrictions
@click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)')
@click.option('--max-buffer-size', '-b', default=4096, help='max buffer size in megabytes (default 4096)')
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')

def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size):
Expand All @@ -303,9 +335,9 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
global QUERY_BUF

TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)

CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size)

start_time = timer()
# Attempt to connect to Redis server
try:
client = redis.StrictRedis(host=host, port=port, password=password)
Expand Down Expand Up @@ -339,5 +371,8 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
# Send all remaining tokens to Redis
QUERY_BUF.send_buffer()

end_time = timer()
QUERY_BUF.report_completion(end_time - start_time)

if __name__ == '__main__':
bulk_insert()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
redis==2.10.6
click>=6.7
backports.csv