Skip to content
31 changes: 31 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,32 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
.vscode

datasets
428 changes: 0 additions & 428 deletions bulk_insert.py

This file was deleted.

7 changes: 7 additions & 0 deletions bulk_insert/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .label import Label
from .relation_type import RelationType
from .query_buffer import QueryBuffer
from .exceptions import (
CSVError,
SchemaError
)
136 changes: 136 additions & 0 deletions bulk_insert/bulk_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import sys
from timeit import default_timer as timer
import redis
import click
import configs
import query_buffer as QueryBuffer
from label import Label
from relation_type import RelationType


def parse_schemas(cls, path_to_csv, csv_tuples):
schemas = [None] * (len(path_to_csv) + len(csv_tuples))
for idx, in_csv in enumerate(path_to_csv):
# Build entity descriptor from input CSV
schemas[idx] = cls(in_csv, None)

offset = len(path_to_csv)
for idx, csv_tuple in enumerate(csv_tuples):
# Build entity descriptor from input CSV
schemas[idx + offset] = cls(csv_tuple[1], csv_tuple[0])
return schemas


# For each input file, validate contents and convert to binary format.
# If any buffer limits have been reached, flush all enqueued inserts to Redis.
def process_entities(entities):
for entity in entities:
entity.process_entities()
added_size = entity.binary_size
# Check to see if the addition of this data will exceed the buffer's capacity
if (QueryBuffer.buffer_size + added_size >= configs.max_buffer_size
or QueryBuffer.redis_token_count + len(entity.binary_entities) >= configs.max_token_count):
# Send and flush the buffer if appropriate
QueryBuffer.send_buffer()
# Add binary data to list and update all counts
QueryBuffer.redis_token_count += len(entity.binary_entities)
QueryBuffer.buffer_size += added_size


def Config_Set(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges, separator, quoting):
# Maximum number of tokens per query
# 1024 * 1024 is the hard-coded Redis maximum. We'll set a slightly lower limit so
# that we can safely ignore tokens that aren't binary strings
# ("GRAPH.BULK", "BEGIN", graph name, counts)
configs.max_token_count = min(max_token_count, 1024 * 1023)
# Maximum size in bytes per query
configs.max_buffer_size = max_buffer_size * 1000000
# Maximum size in bytes per token
# 512 megabytes is a hard-coded Redis maximum
configs.max_token_size = min(max_token_size * 1000000, 512 * 1000000)

configs.skip_invalid_nodes = skip_invalid_nodes
configs.skip_invalid_edges = skip_invalid_edges
configs.separator = separator
configs.quoting = quoting


def QueryBuf_Set(graphname, client, has_relations):
# Redis client and data for each query
QueryBuffer.client = client
QueryBuffer.graphname = graphname

# Create a node dictionary if we're building relations and as such require unique identifiers
if has_relations:
QueryBuffer.nodes = {}


# Command-line arguments
@click.command()
@click.argument('graph')
# Redis server connection settings
@click.option('--host', '-h', default='127.0.0.1', help='Redis server host')
@click.option('--port', '-p', default=6379, help='Redis server port')
@click.option('--password', '-a', default=None, help='Redis server password')
# CSV file paths
@click.option('--nodes', '-n', required=True, multiple=True, help='Path to node csv file')
@click.option('--nodes-with-label', '-N', nargs=2, multiple=True, help='Label string followed by path to node csv file')
@click.option('--relations', '-r', multiple=True, help='Path to relation csv file')
@click.option('--relations-with-type', '-R', nargs=2, multiple=True, help='Relation type string followed by path to relation csv file')
@click.option('--separator', '-o', default=',', help='Field token separator in csv file')
# Buffer size restrictions
@click.option('--max-token-count', '-c', default=1024, help='max number of processed CSVs to send per query (default 1024)')
@click.option('--max-buffer-size', '-b', default=2048, help='max buffer size in megabytes (default 2048)')
@click.option('--max-token-size', '-t', default=500, help='max size of each token in megabytes (default 500, max 512)')
@click.option('--quote', '-q', default=3, help='the quoting format used in the CSV file. QUOTE_MINIMAL=0,QUOTE_ALL=1,QUOTE_NONNUMERIC=2,QUOTE_NONE=3')
@click.option('--skip-invalid-nodes', '-s', default=False, is_flag=True, help='ignore nodes that use previously defined IDs')
@click.option('--skip-invalid-edges', '-e', default=False, is_flag=True, help='ignore invalid edges, print an error message and continue loading (True), or stop loading after an edge loading failure (False)')
def bulk_insert(graph, host, port, password, nodes, nodes_with_label, relations, relations_with_type, separator, max_token_count, max_buffer_size, max_token_size, quote, skip_invalid_nodes, skip_invalid_edges):
if sys.version_info[0] < 3:
raise Exception("Python 3 is required for the RedisGraph bulk loader.")

# Initialize configurations with command-line arguments
Config_Set(max_token_count, max_buffer_size, max_token_size, skip_invalid_nodes, skip_invalid_edges, separator, int(quote))

start_time = timer()
# Attempt to connect to Redis server
try:
client = redis.StrictRedis(host=host, port=port, password=password)
except redis.exceptions.ConnectionError as e:
print("Could not connect to Redis server.")
raise e

# Attempt to verify that RedisGraph module is loaded
try:
module_list = client.execute_command("MODULE LIST")
if not any(b'graph' in module_description for module_description in module_list):
print("RedisGraph module not loaded on connected server.")
sys.exit(1)
except redis.exceptions.ResponseError:
# Ignore check if the connected server does not support the "MODULE LIST" command
pass

# Verify that the graph name is not already used in the Redis database
key_exists = client.execute_command("EXISTS", graph)
if key_exists:
print("Graph with name '%s', could not be created, as Redis key '%s' already exists." % (graph, graph))
sys.exit(1)

QueryBuf_Set(graph, client, relations is not None)

# Read the header rows of each input CSV and save its schema.
labels = parse_schemas(Label, nodes, nodes_with_label)
reltypes = parse_schemas(RelationType, relations, relations_with_type)

process_entities(labels)
process_entities(reltypes)

# Send all remaining tokens to Redis
QueryBuffer.send_buffer()

end_time = timer()
QueryBuffer.report_completion(end_time - start_time)


if __name__ == '__main__':
bulk_insert()
9 changes: 9 additions & 0 deletions bulk_insert/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Default values for command-line arguments

max_token_count = 1024 * 1023
max_buffer_size = 0
max_token_size = 512 * 1000000
skip_invalid_nodes = False
skip_invalid_edges = False
separator = ','
quoting = 3
194 changes: 194 additions & 0 deletions bulk_insert/entity_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import os
import io
import csv
import math
import struct
import configs
from exceptions import CSVError, SchemaError


class Type:
NULL = 0
BOOL = 1
DOUBLE = 2
STRING = 3
LONG = 4
ID = 5
START_ID = 8
END_ID = 9
IGNORE = 10


def convert_schema_type(in_type):
try:
return {
'null': Type.NULL,
'boolean': Type.BOOL,
'double': Type.DOUBLE,
'string': Type.STRING,
'string[]': Type.STRING, # TODO tmp
'integer': Type.LONG,
'int': Type.LONG,
'long': Type.LONG,
'id': Type.ID,
'start_id': Type.START_ID,
'end_id': Type.END_ID
}[in_type]
except KeyError:
# TODO tmp
if in_type.startswith('id('):
return Type.ID
elif in_type.startswith('start_id('):
return Type.START_ID
elif in_type.startswith('end_id('):
return Type.END_ID
else:
raise SchemaError("Encountered invalid field type '%s'" % in_type)


# Convert a single CSV property field into a binary stream.
# Supported property types are string, numeric, boolean, and NULL.
# type is either Type.DOUBLE, Type.BOOL or Type.STRING, and explicitly sets the value to this type if possible
def prop_to_binary(prop_val, prop_type):
# All format strings start with an unsigned char to represent our prop_type enum
format_str = "=B"
if prop_val is None:
# An empty field indicates a NULL property
return struct.pack(format_str, Type.NULL)

# If field can be cast to a float, allow it
if prop_type is None or prop_type == Type.DOUBLE:
try:
numeric_prop = float(prop_val)
if not math.isnan(numeric_prop) and not math.isinf(numeric_prop): # Don't accept non-finite values.
return struct.pack(format_str + "d", Type.DOUBLE, numeric_prop)
except:
raise SchemaError("Could not parse '%s' as a double" % prop_val)

# TODO add support for non-integer ID types
if prop_type is None or prop_type == Type.LONG or prop_type == Type.ID:
try:
numeric_prop = int(float(prop_val))
return struct.pack(format_str + "q", Type.LONG, numeric_prop)
except:
raise SchemaError("Could not parse '%s' as a long" % prop_val)

if prop_type is None or prop_type == Type.BOOL:
# If field is 'false' or 'true', it is a boolean
if prop_val.lower() == 'false':
return struct.pack(format_str + '?', Type.BOOL, False)
elif prop_val.lower() == 'true':
return struct.pack(format_str + '?', Type.BOOL, True)

if prop_type is None or prop_type == Type.STRING:
# If we've reached this point, the property is a string
encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments
# Encoding len+1 adds a null terminator to the string
format_str += "%ds" % (len(encoded_str) + 1)
return struct.pack(format_str, Type.STRING, encoded_str)

# If it hasn't returned by this point, it is trying to set it to a type that it can't adopt
raise Exception("unable to parse [" + prop_val + "] with type ["+repr(prop_type)+"]")


# Superclass for label and relation CSV files
class EntityFile(object):
def __init__(self, filename, label):
# The label or relation type string is the basename of the file
if label:
self.entity_str = label
else:
self.entity_str = os.path.splitext(os.path.basename(filename))[0]
# Input file handling
self.infile = io.open(filename, 'rt')

# Initialize CSV reader that ignores leading whitespace in each field
# and does not modify input quote characters
self.reader = csv.reader(self.infile, delimiter=configs.separator, skipinitialspace=True, quoting=configs.quoting)

self.packed_header = b''
self.binary_entities = []
self.binary_size = 0 # size of binary token

self.convert_header() # Extract data from header row.
self.count_entities() # Count number of entities/row in file.

# Count number of rows in file.
def count_entities(self):
self.entities_count = 0
self.entities_count = sum(1 for line in self.infile)
# seek back
self.infile.seek(0)
return self.entities_count

# Simple input validations for each row of a CSV file
def validate_row(self, row):
# Each row should have the same number of fields
if len(row) != self.column_count:
raise CSVError("%s:%d Expected %d columns, encountered %d ('%s')"
% (self.infile.name, self.reader.line_num, self.column_count, len(row), configs.separator.join(row)))

# If part of a CSV file was sent to Redis, delete the processed entities and update the binary size
def reset_partial_binary(self):
self.binary_entities = []
self.binary_size = len(self.packed_header)

# Convert property keys from a CSV file header into a binary string
def pack_header(self):
# String format
entity_bytes = self.entity_str.encode()
fmt = "=%dsI" % (len(entity_bytes) + 1) # Unaligned native, entity name, count of properties
args = [entity_bytes, self.prop_count]
for idx in range(self.column_count):
if self.skip_offsets[idx]:
continue
prop = self.column_names[idx].encode()
fmt += "%ds" % (len(prop) + 1) # encode string with a null terminator
args.append(prop)
return struct.pack(fmt, *args)

# Extract column names and types from a header row
def convert_header(self):
header = next(self.reader)
self.column_count = len(header)
self.column_names = [None] * self.column_count # Property names of every column.
self.types = [None] * self.column_count # Value type of every column.
self.skip_offsets = [False] * self.column_count # Whether column at any offset should not be stored as a property.

for idx, field in enumerate(header):
pair = field.split(':')
if len(pair) > 2:
raise CSVError("Field '%s' had %d colons" % field, len(field))

if len(pair[0]) == 0: # Delete empty string in a case like ":LABEL"
del pair[0]

if len(pair) < 2:
self.types[idx] = convert_schema_type(pair[0].casefold())
self.skip_offsets[idx] = True
if self.types[idx] not in (Type.ID, Type.START_ID, Type.END_ID, Type.IGNORE):
# Any other field should have 2 elements
raise SchemaError("Each property in the header should be a colon-separated pair")
else:
self.column_names[idx] = pair[0]
self.types[idx] = convert_schema_type(pair[1].casefold())
if self.types[idx] in (Type.START_ID, Type.END_ID, Type.IGNORE):
self.skip_offsets[idx] = True

# The number of properties is equal to the number of non-skipped columns.
self.prop_count = self.skip_offsets.count(False)
self.packed_header = self.pack_header()
self.binary_size += len(self.packed_header)

# Convert a list of properties into a binary string
def pack_props(self, line):
props = []
for idx, field in enumerate(line):
if self.skip_offsets[idx]:
continue
if self.column_names[idx]:
props.append(prop_to_binary(field, self.types[idx]))
return b''.join(p for p in props)

def to_binary(self):
return self.packed_header + b''.join(self.binary_entities)
7 changes: 7 additions & 0 deletions bulk_insert/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Custom error class for invalid inputs
class CSVError(Exception):
pass


class SchemaError(Exception):
pass
Loading