# Remotely Using the BulkImport API via PyArrow

The BulkImport API (the underpinnings of `neo4j-admin import`) provides the fastest way to bootstrap a new database because it performs construction of the record store offline (without transactions).

## Challenges with `neo4j-admin import`

The tooling presupposes:

1. Access to the filesystem on the Neo4j server 😬
2. Data formatted into CSVs 🤮

## The PyArrow / `neo4j-arrow` Solution

1. Remotely stream data (nodes and edges) to the `neo4j-arrow` service from any client.
2. Data can be in any Arrow-compatible format (Pandas DataFrames, Python dicts, Parquet, CSV, JSON, etc.)

## Demo Time!

In [1]:
import neo4j_arrow as na

client = na.Neo4jArrow('neo4j', 'password', ('voutila-arrow-test', 9999), tls=True, verifyTls=False)
[action for action in client.list_actions() if action[0] == 'import.bulk']

[ActionType(type='import.bulk', description='Use neo4j bulk import to bootstrap a new database.')]

### Let's set up some data...
`pyimport` is some scripting around reading a directory of CSVs created via `gds.beta.graph.export.csv`.

It has some caveats, but fine for demo purposes.

In [3]:
import pyimport as pi

node, rels = pi.load_dir('./random')

Loading from ./random
import targets: {'nodes_', 'relationships_SIMILAR_', 'relationships_REL_'}
files = ['nodes_0.csv', 'nodes_2.csv', 'nodes_3.csv', 'nodes_1.csv']
fields = [Field(name='ID', type=<FieldType.NODE_ID: 'ID'>, id_space='Global'), Field(name='louvain', type=<FieldType.LONG: 'long'>, id_space='Global')]

reading ./random/nodes_0.csv...
reading ./random/nodes_2.csv...
reading ./random/nodes_3.csv...
reading ./random/nodes_1.csv...
files = ['relationships_SIMILAR_2.csv', 'relationships_SIMILAR_0.csv', 'relationships_SIMILAR_3.csv', 'relationships_SIMILAR_1.csv']
fields = [Field(name='START_ID', type=<FieldType.START_ID: 'START_ID'>, id_space='Global'), Field(name='END_ID', type=<FieldType.END_ID: 'END_ID'>, id_space='Global'), Field(name='score', type=<FieldType.DOUBLE: 'double'>, id_space='Global')]

reading ./random/relationships_SIMILAR_2.csv...
reading ./random/relationships_SIMILAR_0.csv...
reading ./random/relationships_SIMILAR_3.csv...
reading ./random/relationships_S

### At this point, we've got 2 PyArrow Tables!

One caveat for now is the PyArrow CSV readers don't handle arrays. This can be worked around with Pandas + NumPy, but for this demo just pretend.

In [4]:
node.schema

ID: int64
louvain: int64
_labels_: list<item: string>
  child 0, item: string

In [5]:
rels.schema

START_ID: int64
END_ID: int64
score: double
_type_: string

### Sending the Data

We do a similar dance like with other `neo4j-arrow` routines:

1. Define and create a new server-side Job
2. Stream our Nodes Table to the server
3. Strem our Relationships Table to the server

The _NEW_ thing with this bulk import job is the concept of 1 Job but 2 Streams!

In [6]:
ticket = client.bulk_import(database='demotime123', 
                            idField='ID', labelsField='_labels_',
                            sourceField='START_ID', targetField='END_ID', typeField='_type_')

print(f'Sending {len(node):,} nodes...')
client.put(ticket, node, metadata={'stream.type': 'node'})

print(f'Sending {len(rels):,} relationships...')
client.put(ticket, rels, metadata={'stream.type': 'rels'})

print('Waiting for job completion...')
client.wait_for_job(ticket, desired=na.JobStatus.COMPLETE, timeout=600)

print('Done!')

Sending 10,000,000 nodes...
Sending 140,159,916 relationships...
Waiting for job completion...
Done!


---

## Future Work

1. The above is very unoptimized (single-threaded reads of CSVs, no tuning of batch sizes for streams, etc.)
  - Concurrency _can_ be increased. Lowest hanging fruit is sending nodes and rels concurrently.
  - CSVs suck...enough said.
    
2. No _incremental_ import
  - PRD exists https://docs.google.com/document/d/1l3MbrrZlEG_2QRFAnNu76kBJzUZ3Z1GrkUPUmQjJqcE/edit#
  - Thought experiment: _is it faster to build a new database and use Fabric?_
    