Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
import io
import sys
import struct
import json
from timeit import default_timer as timer
import redis
import click
import json

# Global variables
CONFIGS = None # thresholds for batching Redis queries
NODE_DICT = {} # global node dictionary
TOP_NODE_ID = 0 # next ID to assign to a node
QUERY_BUF = None # Buffer for query being constructed
QUOTING = None

FIELD_TYPES = None

Expand Down Expand Up @@ -204,7 +205,7 @@ def process_entities(self, expected_col_count):
if NODE_DICT is not None:
if row[0] in NODE_DICT:
sys.stderr.write("Node identifier '%s' was used multiple times - second occurrence at %s:%d\n"
% (row[0], self.infile.name, self.reader.line_num))
% (row[0], self.infile.name, self.reader.line_num))
if CONFIGS.skip_invalid_nodes is False:
exit(1)
NODE_DICT[row[0]] = TOP_NODE_ID
Expand Down Expand Up @@ -263,8 +264,7 @@ def process_entities(self, expected_col_count):
print("Relationship specified a non-existent identifier. src: %s; dest: %s" % (row[0], row[1]))
if CONFIGS.skip_invalid_edges is False:
raise e
else:
continue
continue
fmt = "=QQ" # 8-byte unsigned ints for src and dest
row_binary = struct.pack(fmt, src, dest) + self.pack_props(row)
row_binary_len = len(row_binary)
Expand All @@ -287,37 +287,37 @@ def process_entities(self, expected_col_count):
# Convert a single CSV property field into a binary stream.
# Supported property types are string, numeric, boolean, and NULL.
# type is either Type.NUMERIC, Type.BOOL or Type.STRING, and explicitly sets the value to this type if possible
def prop_to_binary(prop_str, type):
def prop_to_binary(prop_val, type):
# All format strings start with an unsigned char to represent our Type enum
format_str = "=B"
if not prop_str:
if prop_val is None:
# An empty field indicates a NULL property
return struct.pack(format_str, Type.NULL)

# If field can be cast to a float, allow it
if type == None or type == Type.NUMERIC:
try:
numeric_prop = float(prop_str)
numeric_prop = float(prop_val)
return struct.pack(format_str + "d", Type.NUMERIC, numeric_prop)
except:
pass

if type == None or type == Type.BOOL:
# If field is 'false' or 'true', it is a boolean
if prop_str.lower() == 'false':
if prop_val.lower() == 'false':
return struct.pack(format_str + '?', Type.BOOL, False)
elif prop_str.lower() == 'true':
elif prop_val.lower() == 'true':
return struct.pack(format_str + '?', Type.BOOL, True)

if type == None or type == Type.STRING:
# If we've reached this point, the property is a string
encoded_str = str.encode(prop_str) # struct.pack requires bytes objects as arguments
encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments
# Encoding len+1 adds a null terminator to the string
format_str += "%ds" % (len(encoded_str) + 1)
return struct.pack(format_str, Type.STRING, encoded_str)

## if it hasn't returned by this point, it is trying to set it to a type that it can't adopt
raise Exception("unable to parse [" + prop_str + "] with type ["+repr(type)+"]")
raise Exception("unable to parse [" + prop_val + "] with type ["+repr(type)+"]")

# For each node input file, validate contents and convert to binary format.
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
Expand Down Expand Up @@ -368,13 +368,13 @@ def bulk_insert(graph, host, port, password, nodes, relations, separator, max_to
if sys.version_info[0] < 3:
raise Exception("Python 3 is required for the RedisGraph bulk loader.")

if field_types != None:
try :
if field_types is not None:
try:
FIELD_TYPES = json.loads(field_types)
except:
raise Exception("Problem parsing field-types. Use the format {<label>:[<col1 type>, <col2 type> ...]} where type can be 0(null),1(bool),2(numeric),3(string) ")

QUOTING=int(quote)
QUOTING = int(quote)

TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges)
Expand Down