diff --git a/bulk_insert.py b/bulk_insert.py index 4d2af52..f6e5743 100644 --- a/bulk_insert.py +++ b/bulk_insert.py @@ -1,8 +1,11 @@ import csv import os +import io import struct +from timeit import default_timer as timer import redis import click +from backports import csv # Global variables CONFIGS = None # thresholds for batching Redis queries @@ -55,6 +58,9 @@ def __init__(self, graphname, client): self.labels = [] # List containing all pending Label objects self.reltypes = [] # List containing all pending RelationType objects + self.nodes_created = 0 # Total number of nodes created + self.relations_created = 0 # Total number of relations created + # Send all pending inserts to Redis def send_buffer(self): # Do nothing if we have no entities @@ -68,7 +74,9 @@ def send_buffer(self): self.initial_query = False result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - print(result) + stats = result.split(', '.encode()) + self.nodes_created += int(stats[0].split(' '.encode())[0]) + self.relations_created += int(stats[1].split(' '.encode())[0]) self.clear_buffer() @@ -83,13 +91,17 @@ def clear_buffer(self): del self.labels[:] del self.reltypes[:] + def report_completion(self, runtime): + print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" + % (self.graphname, self.nodes_created, self.relations_created, runtime)) + # Superclass for label and relation CSV files class EntityFile(object): def __init__(self, filename): # The label or relation type string is the basename of the file - self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode("ascii") + self.entity_str = os.path.splitext(os.path.basename(filename))[0].encode('utf-8') # Input file handling - self.infile = open(filename, 'rt') + self.infile = io.open(filename, 'rt', encoding='utf-8') # Initialize CSV reader that ignores leading whitespace in each field # and does not modify input quote characters self.reader = csv.reader(self.infile, skipinitialspace=True, quoting=csv.QUOTE_NONE) @@ -100,6 +112,17 @@ def __init__(self, filename): self.packed_header = "" self.binary_entities = [] self.binary_size = 0 # size of binary token + self.count_entities() # number of entities/row in file. + + # Count number of rows in file. + def count_entities(self): + self.entities_count = 0 + self.entities_count = sum(1 for line in self.infile) + # discard header row + self.entities_count -= 1 + # seek back + self.infile.seek(0) + return self.entities_count # Simple input validations for each row of a CSV file def validate_row(self, expected_col_count, row): @@ -119,9 +142,10 @@ def pack_header(self, header): # String format fmt = "=%dsI" % (len(self.entity_str) + 1) # Unaligned native, entity_string, count of properties args = [self.entity_str, prop_count] - for prop in header[self.prop_offset:]: + for p in header[self.prop_offset:]: + prop = p.encode('utf-8') fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator - args += [str.encode(prop)] + args.append(prop) return struct.pack(fmt, *args) # Convert a list of properties into a binary string @@ -160,31 +184,35 @@ def process_entities(self, expected_col_count): global TOP_NODE_ID global QUERY_BUF - for row in self.reader: - self.validate_row(expected_col_count, row) - # Add identifier->ID pair to dictionary if we are building relations - if NODE_DICT is not None: - if row[0] in NODE_DICT: - print("Node identifier '%s' was used multiple times - second occurrence at %s:%d" - % (row[0], self.infile.name, self.reader.line_num)) - exit(1) - NODE_DICT[row[0]] = TOP_NODE_ID - TOP_NODE_ID += 1 - row_binary = self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.labels.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the label onto the query buffer again, as there are more entities to process. - QUERY_BUF.labels.append(self.to_binary()) - - QUERY_BUF.node_count += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.labels.append(self.to_binary()) + entities_created = 0 + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + for row in reader: + self.validate_row(expected_col_count, row) + # Add identifier->ID pair to dictionary if we are building relations + if NODE_DICT is not None: + if row[0] in NODE_DICT: + print("Node identifier '%s' was used multiple times - second occurrence at %s:%d" + % (row[0], self.infile.name, self.reader.line_num)) + exit(1) + NODE_DICT[row[0]] = TOP_NODE_ID + TOP_NODE_ID += 1 + row_binary = self.pack_props(row) + row_binary_len = len(row_binary) + # If the addition of this entity will make the binary token grow too large, + # send the buffer now. + if self.binary_size + row_binary_len > CONFIGS.max_token_size: + QUERY_BUF.labels.append(self.to_binary()) + QUERY_BUF.send_buffer() + self.reset_partial_binary() + # Push the label onto the query buffer again, as there are more entities to process. + QUERY_BUF.labels.append(self.to_binary()) + + QUERY_BUF.node_count += 1 + entities_created += 1 + self.binary_size += row_binary_len + self.binary_entities.append(row_binary) + QUERY_BUF.labels.append(self.to_binary()) + print("%d nodes created with label '%s'" % (entities_created, self.entity_str)) # Handler class for processing relation csv files. class RelationType(EntityFile): @@ -211,31 +239,34 @@ def process_header(self): return expected_col_count def process_entities(self, expected_col_count): - for row in self.reader: - self.validate_row(expected_col_count, row) - - try: - src = NODE_DICT[row[0]] - dest = NODE_DICT[row[1]] - except KeyError as e: - print("Relationship specified a non-existent identifier.") - raise e - fmt = "=QQ" # 8-byte unsigned ints for src and dest - row_binary = struct.pack(fmt, src, dest) + self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.reltypes.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the reltype onto the query buffer again, as there are more entities to process. - QUERY_BUF.reltypes.append(self.to_binary()) - - QUERY_BUF.relation_count += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.reltypes.append(self.to_binary()) + entities_created = 0 + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + for row in reader: + self.validate_row(expected_col_count, row) + try: + src = NODE_DICT[row[0]] + dest = NODE_DICT[row[1]] + except KeyError as e: + print("Relationship specified a non-existent identifier.") + raise e + fmt = "=QQ" # 8-byte unsigned ints for src and dest + row_binary = struct.pack(fmt, src, dest) + self.pack_props(row) + row_binary_len = len(row_binary) + # If the addition of this entity will make the binary token grow too large, + # send the buffer now. + if self.binary_size + row_binary_len > CONFIGS.max_token_size: + QUERY_BUF.reltypes.append(self.to_binary()) + QUERY_BUF.send_buffer() + self.reset_partial_binary() + # Push the reltype onto the query buffer again, as there are more entities to process. + QUERY_BUF.reltypes.append(self.to_binary()) + + QUERY_BUF.relation_count += 1 + entities_created += 1 + self.binary_size += row_binary_len + self.binary_entities.append(row_binary) + QUERY_BUF.reltypes.append(self.to_binary()) + print("%d relations created for type '%s'" % (entities_created, self.entity_str)) # Convert a single CSV property field into a binary stream. # Supported property types are string, numeric, boolean, and NULL. @@ -261,8 +292,9 @@ def prop_to_binary(prop_str): # If we've reached this point, the property is a string # Encoding len+1 adds a null terminator to the string - format_str += "%ds" % (len(prop_str) + 1) - return struct.pack(format_str, Type.STRING, str.encode(prop_str)) + encoded_str = prop_str.encode('utf-8') + format_str += "%ds" % (len(encoded_str) + 1) + return struct.pack(format_str, Type.STRING, encoded_str) # For each node input file, validate contents and convert to binary format. # If any buffer limits have been reached, flush all enqueued inserts to Redis. @@ -293,7 +325,7 @@ def process_entity_csvs(cls, csvs): @click.option('--relations', '-r', multiple=True, help='Path to relation csv file') # Buffer size restrictions @click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)') -@click.option('--max-buffer-size', '-b', default=4096, help='max buffer size in megabytes (default 4096)') +@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)') @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size): @@ -303,9 +335,9 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, global QUERY_BUF TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests) - CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size) + start_time = timer() # Attempt to connect to Redis server try: client = redis.StrictRedis(host=host, port=port, password=password) @@ -339,5 +371,8 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, # Send all remaining tokens to Redis QUERY_BUF.send_buffer() + end_time = timer() + QUERY_BUF.report_completion(end_time - start_time) + if __name__ == '__main__': bulk_insert() diff --git a/requirements.txt b/requirements.txt index 5772c06..7672fba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ redis==2.10.6 click>=6.7 +backports.csv