diff --git a/bulk_insert.py b/bulk_insert.py index 6c8bd67..8a8eab9 100644 --- a/bulk_insert.py +++ b/bulk_insert.py @@ -26,7 +26,7 @@ class Type: # User-configurable thresholds for when to send queries to Redis class Configs(object): - def __init__(self, max_token_count, max_buffer_size, max_token_size): + def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes): # Maximum number of tokens per query # 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so # that we can safely ignore tokens that aren't binary strings @@ -38,6 +38,8 @@ def __init__(self, max_token_count, max_buffer_size, max_token_size): # 512 megabytes is a hard-coded Redis maximum self.max_token_size = min(max_token_size * 1000000, 512 * 1000000) + self.skip_invalid_nodes = skip_invalid_nodes + # QueryBuffer is the class that processes input CSVs and emits their binary formats to the Redis client. class QueryBuffer(object): def __init__(self, graphname, client): @@ -192,9 +194,10 @@ def process_entities(self, expected_col_count): # Add identifier->ID pair to dictionary if we are building relations if NODE_DICT is not None: if row[0] in NODE_DICT: - print("Node identifier '%s' was used multiple times - second occurrence at %s:%d" + sys.stderr.write("Node identifier '%s' was used multiple times - second occurrence at %s:%d\n" % (row[0], self.infile.name, self.reader.line_num)) - exit(1) + if CONFIGS.skip_invalid_nodes is False: + exit(1) NODE_DICT[row[0]] = TOP_NODE_ID TOP_NODE_ID += 1 row_binary = self.pack_props(row) @@ -329,8 +332,9 @@ def process_entity_csvs(cls, csvs): @click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)') @click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)') @click.option('--quote-minimal/--no-quote-minimal', '-q/-d', default=False, help='only quote those fields which contain special characters such as delimiter, quotechar or any of the characters in lineterminator') +@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs') -def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size, quote_minimal): +def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size, quote_minimal, skip_invalid_nodes): global CONFIGS global NODE_DICT global TOP_NODE_ID @@ -346,7 +350,7 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, QUOTING=csv.QUOTE_NONE TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests) - CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size) + CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes) start_time = timer() # Attempt to connect to Redis server