Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ bulk_insert.py GRAPHNAME [OPTIONS]
| -t | --max-token-count INT | max number of tokens sent in each Redis query (default 1024) |
| -b | --max-buffer-size INT | max batch size (MBs) of each Redis query (default 4096) |
| -c | --max-token-size INT | max size (MBs) of each token sent to Redis (default 500) |
| -q | --quote-minimal | enable smart quoting for items within the CSV |
| -q | --quote | the quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3 |
| -f | --field-types | json to set explicit types for each field, format {<label>:[<col1 type>, <col2 type> ...]} where type can be 0(null),1(bool),2(numeric),3(string) |


The only required arguments are the name to give the newly-created graph (which can appear anywhere) and at least one node CSV file.
Expand All @@ -39,6 +40,15 @@ python bulk_insert.py GRAPH_DEMO -n example/Person.csv -n example/Country.csv -r
```
The label (for nodes) or relationship type (for relationships) is derived from the base name of the input CSV file. In this example, we'll construct two sets of nodes, labeled `Person` and `Country`, and two types of relationships - `KNOWS` and `VISITED`.

The default behaviour is to infer the type for each row based on the value of each row, which can cause type mismatch problem. For example if a string property contains string values of 'false', 'true' or numbers. To avoid this, use --field-types to explicitly set the type for each column in the csv.
EG, to explicitly set to string.

```
python3 bulk_insert.py ROBOTS -f '{"Robots" : [3]}' -q1 -n example2/Robots.csv
```

Notice that when -f isn't used, the robot name "30165" would be inserted as a number rather than a string which causes problems in RedisGraph when searching.

## Input constraints
### Node identifiers
- If both nodes and relations are being created, each node must be associated with a unique identifier.
Expand All @@ -55,6 +65,8 @@ The label (for nodes) or relationship type (for relationships) is derived from t
- `numeric`: an unquoted value that can be read as a floating-point or integer type.
- `string`: any field that is either quote-interpolated or cannot be casted to a numeric or boolean type.
- `NULL`: an empty field.
- Default behaviour is to infer the property type, attempting to cast it to null, float, boolean or string in that order.
- If explicit type is required, for example, if a value is "1234" and it must not be inferred into a float, you can use the option -f to specify the type explicitly for each row being imported.

### Label file format:
- Each row must have the same number of fields.
Expand All @@ -68,3 +80,4 @@ The label (for nodes) or relationship type (for relationships) is derived from t
- The first two fields of each row are the source and destination node identifiers. The names of these fields in the header do not matter.
- If the file has more than 2 fields, all subsequent fields are relationship properties that adhere to the same rules as node properties.
- Described relationships are always considered to be directed (source->destination).

73 changes: 47 additions & 26 deletions bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
from timeit import default_timer as timer
import redis
import click
import json

# Global variables
CONFIGS = None # thresholds for batching Redis queries
NODE_DICT = {} # global node dictionary
TOP_NODE_ID = 0 # next ID to assign to a node
QUERY_BUF = None # Buffer for query being constructed

FIELD_TYPES = None

# Custom error class for invalid inputs
class CSVError(Exception):
pass
Expand Down Expand Up @@ -155,8 +158,13 @@ def pack_header(self, header):
# Convert a list of properties into a binary string
def pack_props(self, line):
props = []
for field in line[self.prop_offset:]:
props.append(prop_to_binary(field))
for num, field in enumerate(line[self.prop_offset:]):
try :
FIELD_TYPES[self.entity_str][num]
except :
props.append(prop_to_binary(field, None))
else :
props.append(prop_to_binary(field, FIELD_TYPES[self.entity_str][num]))

return b''.join(p for p in props)

Expand Down Expand Up @@ -278,31 +286,39 @@ def process_entities(self, expected_col_count):

# Convert a single CSV property field into a binary stream.
# Supported property types are string, numeric, boolean, and NULL.
def prop_to_binary(prop_str):
# type is either Type.NUMERIC, Type.BOOL or Type.STRING, and explicitly sets the value to this type if possible
def prop_to_binary(prop_str, type):
# All format strings start with an unsigned char to represent our Type enum
format_str = "=B"
if not prop_str:
# An empty field indicates a NULL property
return struct.pack(format_str, Type.NULL)

# If field can be cast to a float, allow it
try:
numeric_prop = float(prop_str)
return struct.pack(format_str + "d", Type.NUMERIC, numeric_prop)
except:
pass

# If field is 'false' or 'true', it is a boolean
if prop_str.lower() == 'false':
return struct.pack(format_str + '?', Type.BOOL, False)
elif prop_str.lower() == 'true':
return struct.pack(format_str + '?', Type.BOOL, True)

# If we've reached this point, the property is a string
encoded_str = str.encode(prop_str) # struct.pack requires bytes objects as arguments
# Encoding len+1 adds a null terminator to the string
format_str += "%ds" % (len(encoded_str) + 1)
return struct.pack(format_str, Type.STRING, encoded_str)

if type == None or type == Type.NUMERIC:
try:
numeric_prop = float(prop_str)
return struct.pack(format_str + "d", Type.NUMERIC, numeric_prop)
except:
pass

if type == None or type == Type.BOOL:
# If field is 'false' or 'true', it is a boolean
if prop_str.lower() == 'false':
return struct.pack(format_str + '?', Type.BOOL, False)
elif prop_str.lower() == 'true':
return struct.pack(format_str + '?', Type.BOOL, True)

if type == None or type == Type.STRING:
# If we've reached this point, the property is a string
encoded_str = str.encode(prop_str) # struct.pack requires bytes objects as arguments
# Encoding len+1 adds a null terminator to the string
format_str += "%ds" % (len(encoded_str) + 1)
return struct.pack(format_str, Type.STRING, encoded_str)

## if it hasn't returned by this point, it is trying to set it to a type that it can't adopt
raise Exception("unable to parse [" + prop_str + "] with type ["+repr(type)+"]")

# For each node input file, validate contents and convert to binary format.
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
Expand Down Expand Up @@ -336,25 +352,30 @@ def process_entity_csvs(cls, csvs, separator):
@click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)')
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
@click.option('--quote-minimal/--no-quote-minimal', '-q/-d', default=False, help='only quote those fields which contain special characters such as delimiter, quotechar or any of the characters in lineterminator')
@click.option('--quote', '-q', default=3, help='the quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3')
@click.option('--field-types', '-f', default=None, help='json to set explicit types for each field, format {<label>:[<col1 type>, <col2 type> ...]} where type can be 0(null),1(bool),2(numeric),3(string)')
@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs')
@click.option('--skip-invalid-edges', '-e', default=False, is_flag=True, help='ignore invalid edges, print an error message and continue loading (True), or stop loading after an edge loading failure (False)')


def bulk_insert(graph, host, port, password, nodes, relations, separator, max_token_count, max_buffer_size, max_token_size, quote_minimal, skip_invalid_nodes, skip_invalid_edges):
def bulk_insert(graph, host, port, password, nodes, relations, separator, max_token_count, max_buffer_size, max_token_size, quote, field_types, skip_invalid_nodes, skip_invalid_edges):
global CONFIGS
global NODE_DICT
global TOP_NODE_ID
global QUERY_BUF
global QUOTING
global FIELD_TYPES

if sys.version_info[0] < 3:
raise Exception("Python 3 is required for the RedisGraph bulk loader.")

if quote_minimal:
QUOTING=csv.QUOTE_MINIMAL
else:
QUOTING=csv.QUOTE_NONE
if field_types != None:
try :
FIELD_TYPES = json.loads(field_types)
except:
raise Exception("Problem parsing field-types. Use the format {<label>:[<col1 type>, <col2 type> ...]} where type can be 0(null),1(bool),2(numeric),3(string) ")

QUOTING=int(quote)

TOP_NODE_ID = 0 # reset global ID variable (in case we are calling bulk_insert from unit tests)
CONFIGS = Configs(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges)
Expand Down
5 changes: 5 additions & 0 deletions example2/Robots.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"name"
"Beebop"
"30165"
"Chappy"
"Wal-e"