From 8cd5a73355e3c8282bcee898f2540840e6527d46 Mon Sep 17 00:00:00 2001 From: Jeffrey Lovitz Date: Mon, 11 May 2020 13:05:46 -0400 Subject: [PATCH 01/26] Port changes from LDBC branch --- redisgraph_bulk_loader/__init__.py | 7 + redisgraph_bulk_loader/bulk_insert.py | 424 ++++-------------------- redisgraph_bulk_loader/configs.py | 9 + redisgraph_bulk_loader/entity_file.py | 194 +++++++++++ redisgraph_bulk_loader/exceptions.py | 7 + redisgraph_bulk_loader/label.py | 65 ++++ redisgraph_bulk_loader/query_buffer.py | 72 ++++ redisgraph_bulk_loader/relation_type.py | 80 +++++ 8 files changed, 500 insertions(+), 358 deletions(-) create mode 100644 redisgraph_bulk_loader/configs.py create mode 100644 redisgraph_bulk_loader/entity_file.py create mode 100644 redisgraph_bulk_loader/exceptions.py create mode 100644 redisgraph_bulk_loader/label.py create mode 100644 redisgraph_bulk_loader/query_buffer.py create mode 100644 redisgraph_bulk_loader/relation_type.py diff --git a/redisgraph_bulk_loader/__init__.py b/redisgraph_bulk_loader/__init__.py index 5cdcdf5..4dd3fd0 100644 --- a/redisgraph_bulk_loader/__init__.py +++ b/redisgraph_bulk_loader/__init__.py @@ -1,3 +1,10 @@ +from .label import Label +from .relation_type import RelationType +from .query_buffer import QueryBuffer +from .exceptions import ( + CSVError, + SchemaError +) from redisgraph_bulk_loader import bulk_insert __all__ = [ diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 5cd77ec..0ec542b 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -1,342 +1,69 @@ -import csv -import os -import io import sys -import math -import struct -import json from timeit import default_timer as timer import redis import click +import configs +import query_buffer as QueryBuffer +from label import Label +from relation_type import RelationType -# Global variables -CONFIGS = None # thresholds for batching Redis queries -NODE_DICT = {} # global node dictionary -TOP_NODE_ID = 0 # next ID to assign to a node -QUERY_BUF = None # Buffer for query being constructed -QUOTING = None -FIELD_TYPES = None - -# Custom error class for invalid inputs -class CSVError(Exception): - pass - -# Official enum support varies widely between 2.7 and 3.x, so we'll use a custom class -class Type: - NULL = 0 - BOOL = 1 - NUMERIC = 2 - STRING = 3 - -# User-configurable thresholds for when to send queries to Redis -class Configs(object): - def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges): - # Maximum number of tokens per query - # 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so - # that we can safely ignore tokens that aren't binary strings - # ("GRAPH.BULK", "BEGIN", graph name, counts) - self.max_token_count = min(max_token_count, 1024 * 1023) - # Maximum size in bytes per query - self.max_buffer_size = max_buffer_size * 1000000 - # Maximum size in bytes per token - # 512 megabytes is a hard-coded Redis maximum - self.max_token_size = min(max_token_size * 1000000, 512 * 1000000) - - self.skip_invalid_nodes = skip_invalid_nodes - self.skip_invalid_edges = skip_invalid_edges - -# QueryBuffer is the class that processes input CSVs and emits their binary formats to the Redis client. -class QueryBuffer(object): - def __init__(self, graphname, client): - # Redis client and data for each query - self.client = client - - # Sizes for buffer currently being constructed - self.redis_token_count = 0 - self.buffer_size = 0 - - # The first query should include a "BEGIN" token - self.graphname = graphname - self.initial_query = True - - self.node_count = 0 - self.relation_count = 0 - - self.labels = [] # List containing all pending Label objects - self.reltypes = [] # List containing all pending RelationType objects - - self.nodes_created = 0 # Total number of nodes created - self.relations_created = 0 # Total number of relations created - - # Send all pending inserts to Redis - def send_buffer(self): - # Do nothing if we have no entities - if self.node_count == 0 and self.relation_count == 0: - return - - args = [self.node_count, self.relation_count, len(self.labels), len(self.reltypes)] + self.labels + self.reltypes - # Prepend a "BEGIN" token if this is the first query - if self.initial_query: - args.insert(0, "BEGIN") - self.initial_query = False - - result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - stats = result.split(', '.encode()) - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) - - self.clear_buffer() - - # Delete all entities that have been inserted - def clear_buffer(self): - self.redis_token_count = 0 - self.buffer_size = 0 - - # All constructed entities have been inserted, so clear buffers - self.node_count = 0 - self.relation_count = 0 - del self.labels[:] - del self.reltypes[:] - - def report_completion(self, runtime): - print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" - % (self.graphname, self.nodes_created, self.relations_created, runtime)) - -# Superclass for label and relation CSV files -class EntityFile(object): - def __init__(self, filename, separator): - # The label or relation type string is the basename of the file - self.entity_str = os.path.splitext(os.path.basename(filename))[0] - # Input file handling - self.infile = io.open(filename, 'rt') - # Initialize CSV reader that ignores leading whitespace in each field - # and does not modify input quote characters - self.reader = csv.reader(self.infile, delimiter=separator, skipinitialspace=True, quoting=QUOTING) - - self.prop_offset = 0 # Starting index of properties in row - self.prop_count = 0 # Number of properties per entity - - self.packed_header = b'' - self.binary_entities = [] - self.binary_size = 0 # size of binary token - self.count_entities() # number of entities/row in file. - - # Count number of rows in file. - def count_entities(self): - self.entities_count = 0 - self.entities_count = sum(1 for line in self.infile) - # discard header row - self.entities_count -= 1 - # seek back - self.infile.seek(0) - return self.entities_count - - # Simple input validations for each row of a CSV file - def validate_row(self, expected_col_count, row): - # Each row should have the same number of fields - if len(row) != expected_col_count: - raise CSVError("%s:%d Expected %d columns, encountered %d ('%s')" - % (self.infile.name, self.reader.line_num, expected_col_count, len(row), ','.join(row))) - - # If part of a CSV file was sent to Redis, delete the processed entities and update the binary size - def reset_partial_binary(self): - self.binary_entities = [] - self.binary_size = len(self.packed_header) - - # Convert property keys from a CSV file header into a binary string - def pack_header(self, header): - prop_count = len(header) - self.prop_offset - # String format - entity_bytes = self.entity_str.encode() - fmt = "=%dsI" % (len(entity_bytes) + 1) # Unaligned native, entity name, count of properties - args = [entity_bytes, prop_count] - for p in header[self.prop_offset:]: - prop = p.encode() - fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator - args.append(prop) - return struct.pack(fmt, *args) - - # Convert a list of properties into a binary string - def pack_props(self, line): - props = [] - for num, field in enumerate(line[self.prop_offset:]): - field_type_idx = self.prop_offset+num - try: - FIELD_TYPES[self.entity_str][field_type_idx] - except: - props.append(prop_to_binary(field, None)) - else: - props.append(prop_to_binary(field, FIELD_TYPES[self.entity_str][field_type_idx])) - return b''.join(p for p in props) - - def to_binary(self): - return self.packed_header + b''.join(self.binary_entities) - -# Handler class for processing label csv files. -class Label(EntityFile): - def __init__(self, infile, separator): - super(Label, self).__init__(infile, separator) - expected_col_count = self.process_header() - self.process_entities(expected_col_count) - self.infile.close() - - def process_header(self): - # Header format: - # node identifier (which may be a property key), then all other property keys - header = next(self.reader) - expected_col_count = len(header) - # If identifier field begins with an underscore, don't add it as a property. - if header[0][0] == '_': - self.prop_offset = 1 - self.packed_header = self.pack_header(header) - self.binary_size += len(self.packed_header) - return expected_col_count - - def process_entities(self, expected_col_count): - global NODE_DICT - global TOP_NODE_ID - global QUERY_BUF - - entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: - for row in reader: - self.validate_row(expected_col_count, row) - # Add identifier->ID pair to dictionary if we are building relations - if NODE_DICT is not None: - if row[0] in NODE_DICT: - sys.stderr.write("Node identifier '%s' was used multiple times - second occurrence at %s:%d\n" - % (row[0], self.infile.name, self.reader.line_num)) - if CONFIGS.skip_invalid_nodes is False: - exit(1) - NODE_DICT[row[0]] = TOP_NODE_ID - TOP_NODE_ID += 1 - row_binary = self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.labels.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the label onto the query buffer again, as there are more entities to process. - QUERY_BUF.labels.append(self.to_binary()) - - QUERY_BUF.node_count += 1 - entities_created += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.labels.append(self.to_binary()) - print("%d nodes created with label '%s'" % (entities_created, self.entity_str)) +def parse_schemas(cls, path_to_csv, csv_tuples): + schemas = [None] * (len(path_to_csv) + len(csv_tuples)) + for idx, in_csv in enumerate(path_to_csv): + # Build entity descriptor from input CSV + schemas[idx] = cls(in_csv, None) -# Handler class for processing relation csv files. -class RelationType(EntityFile): - def __init__(self, infile, separator): - super(RelationType, self).__init__(infile, separator) - expected_col_count = self.process_header() - self.process_entities(expected_col_count) - self.infile.close() + offset = len(path_to_csv) + for idx, csv_tuple in enumerate(csv_tuples): + # Build entity descriptor from input CSV + schemas[idx + offset] = cls(csv_tuple[1], csv_tuple[0]) + return schemas - def process_header(self): - # Header format: - # source identifier, dest identifier, properties[0..n] - header = next(self.reader) - # Assume rectangular CSVs - expected_col_count = len(header) - self.prop_count = expected_col_count - 2 - if self.prop_count < 0: - raise CSVError("Relation file '%s' should have at least 2 elements in header line." - % (self.infile.name)) - self.prop_offset = 2 - self.packed_header = self.pack_header(header) # skip src and dest identifiers - self.binary_size += len(self.packed_header) - return expected_col_count +# For each input file, validate contents and convert to binary format. +# If any buffer limits have been reached, flush all enqueued inserts to Redis. +def process_entities(entities): + for entity in entities: + entity.process_entities() + added_size = entity.binary_size + # Check to see if the addition of this data will exceed the buffer's capacity + if (QueryBuffer.buffer_size + added_size >= configs.max_buffer_size + or QueryBuffer.redis_token_count + len(entity.binary_entities) >= configs.max_token_count): + # Send and flush the buffer if appropriate + QueryBuffer.send_buffer() + # Add binary data to list and update all counts + QueryBuffer.redis_token_count += len(entity.binary_entities) + QueryBuffer.buffer_size += added_size - def process_entities(self, expected_col_count): - entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: - for row in reader: - self.validate_row(expected_col_count, row) - try: - src = NODE_DICT[row[0]] - dest = NODE_DICT[row[1]] - except KeyError as e: - print("Relationship specified a non-existent identifier. src: %s; dest: %s" % (row[0], row[1])) - if CONFIGS.skip_invalid_edges is False: - raise e - continue - fmt = "=QQ" # 8-byte unsigned ints for src and dest - row_binary = struct.pack(fmt, src, dest) + self.pack_props(row) - row_binary_len = len(row_binary) - # If the addition of this entity will make the binary token grow too large, - # send the buffer now. - if self.binary_size + row_binary_len > CONFIGS.max_token_size: - QUERY_BUF.reltypes.append(self.to_binary()) - QUERY_BUF.send_buffer() - self.reset_partial_binary() - # Push the reltype onto the query buffer again, as there are more entities to process. - QUERY_BUF.reltypes.append(self.to_binary()) - QUERY_BUF.relation_count += 1 - entities_created += 1 - self.binary_size += row_binary_len - self.binary_entities.append(row_binary) - QUERY_BUF.reltypes.append(self.to_binary()) - print("%d relations created for type '%s'" % (entities_created, self.entity_str)) +def Config_Set(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges, separator, quoting): + # Maximum number of tokens per query + # 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so + # that we can safely ignore tokens that aren't binary strings + # ("GRAPH.BULK", "BEGIN", graph name, counts) + configs.max_token_count = min(max_token_count, 1024 * 1023) + # Maximum size in bytes per query + configs.max_buffer_size = max_buffer_size * 1000000 + # Maximum size in bytes per token + # 512 megabytes is a hard-coded Redis maximum + configs.max_token_size = min(max_token_size * 1000000, 512 * 1000000) -# Convert a single CSV property field into a binary stream. -# Supported property types are string, numeric, boolean, and NULL. -# type is either Type.NUMERIC, Type.BOOL or Type.STRING, and explicitly sets the value to this type if possible -def prop_to_binary(prop_val, type): - # All format strings start with an unsigned char to represent our Type enum - format_str = "=B" - if prop_val is None: - # An empty field indicates a NULL property - return struct.pack(format_str, Type.NULL) + configs.skip_invalid_nodes = skip_invalid_nodes + configs.skip_invalid_edges = skip_invalid_edges + configs.separator = separator + configs.quoting = quoting - # If field can be cast to a float, allow it - if type is None or type == Type.NUMERIC: - try: - numeric_prop = float(prop_val) - if not math.isnan(numeric_prop) and not math.isinf(numeric_prop): # Don't accept non-finite values. - return struct.pack(format_str + "d", Type.NUMERIC, numeric_prop) - except: - pass - if type is None or type == Type.BOOL: - # If field is 'false' or 'true', it is a boolean - if prop_val.lower() == 'false': - return struct.pack(format_str + '?', Type.BOOL, False) - elif prop_val.lower() == 'true': - return struct.pack(format_str + '?', Type.BOOL, True) +def QueryBuf_Set(graphname, client, has_relations): + # Redis client and data for each query + QueryBuffer.client = client + QueryBuffer.graphname = graphname - if type is None or type == Type.STRING: - # If we've reached this point, the property is a string - encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments - # Encoding len+1 adds a null terminator to the string - format_str += "%ds" % (len(encoded_str) + 1) - return struct.pack(format_str, Type.STRING, encoded_str) + # Create a node dictionary if we're building relations and as such require unique identifiers + if has_relations: + QueryBuffer.nodes = {} - ## if it hasn't returned by this point, it is trying to set it to a type that it can't adopt - raise Exception("unable to parse [" + prop_val + "] with type ["+repr(type)+"]") - -# For each node input file, validate contents and convert to binary format. -# If any buffer limits have been reached, flush all enqueued inserts to Redis. -def process_entity_csvs(cls, csvs, separator): - global QUERY_BUF - for in_csv in csvs: - # Build entity descriptor from input CSV - entity = cls(in_csv, separator) - added_size = entity.binary_size - # Check to see if the addition of this data will exceed the buffer's capacity - if (QUERY_BUF.buffer_size + added_size >= CONFIGS.max_buffer_size - or QUERY_BUF.redis_token_count + len(entity.binary_entities) >= CONFIGS.max_token_count): - # Send and flush the buffer if appropriate - QUERY_BUF.send_buffer() - # Add binary data to list and update all counts - QUERY_BUF.redis_token_count += len(entity.binary_entities) - QUERY_BUF.buffer_size += added_size # Command-line arguments @click.command() @@ -347,39 +74,23 @@ def process_entity_csvs(cls, csvs, separator): @click.option('--password', '-a', default=None, help='Redis server password') # CSV file paths @click.option('--nodes', '-n', required=True, multiple=True, help='Path to node csv file') +@click.option('--nodes-with-label', '-N', nargs=2, multiple=True, help='Label string followed by path to node csv file') @click.option('--relations', '-r', multiple=True, help='Path to relation csv file') +@click.option('--relations-with-type', '-R', nargs=2, multiple=True, help='Relation type string followed by path to relation csv file') @click.option('--separator', '-o', default=',', help='Field token separator in csv file') # Buffer size restrictions @click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)') @click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)') @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') @click.option('--quote', '-q', default=3, help='the quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3') -@click.option('--field-types', '-f', default=None, help='json to set explicit types for each field, format {