Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Type:

# User-configurable thresholds for when to send queries to Redis
class Configs(object):
def __init__(self, max_token_count, max_buffer_size, max_token_size):
def __init__(self, max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes):
# Maximum number of tokens per query
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
# that we can safely ignore tokens that aren't binary strings
Expand All @@ -38,6 +38,8 @@ def __init__(self, max_token_count, max_buffer_size, max_token_size):
# 512 megabytes is a hard-coded Redis maximum
self.max_token_size = min(max_token_size * 1000000, 512 * 1000000)

self.skip_invalid_nodes = skip_invalid_nodes

# QueryBuffer is the class that processes input CSVs and emits their binary formats to the Redis client.
class QueryBuffer(object):
def __init__(self, graphname, client):
Expand Down Expand Up @@ -192,9 +194,10 @@ def process_entities(self, expected_col_count):
# Add identifier->ID pair to dictionary if we are building relations
if NODE_DICT is not None:
if row[0] in NODE_DICT:
print("Node identifier '%s' was used multiple times - second occurrence at %s:%d"
sys.stderr.write("Node identifier '%s' was used multiple times - second occurrence at %s:%d\n"
% (row[0], self.infile.name, self.reader.line_num))
exit(1)
if CONFIGS.skip_invalid_nodes is False:
exit(1)
NODE_DICT[row[0]] = TOP_NODE_ID
TOP_NODE_ID += 1
row_binary = self.pack_props(row)
Expand Down Expand Up @@ -329,8 +332,9 @@ def process_entity_csvs(cls, csvs):
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
@click.option('--quote-minimal/--no-quote-minimal', '-q/-d', default=False, help='only quote those fields which contain special characters such as delimiter, quotechar or any of the characters in lineterminator')
@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs')

def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size, quote_minimal):
def bulk_insert(graph, host, port, password, nodes, relations, max_token_count, max_buffer_size, max_token_size, quote_minimal, skip_invalid_nodes):
global CONFIGS
global NODE_DICT
global TOP_NODE_ID
Expand All @@ -346,7 +350,7 @@ def bulk_insert(graph, host, port, password, nodes, relations, max_token_count,
QUOTING=csv.QUOTE_NONE

TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size)
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes)

start_time = timer()
# Attempt to connect to Redis server
Expand Down