Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions integration_tests/test_client_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ def test_csv_import(self):
self.assertEqual(3, len(response.results))
self.assertEqual(target, [result.row.columns[0] for result in response.results])

# test clear import
reader = csv_column_reader(StringIO(text))
client.import_field(field, reader, clear=True)
bq = self.index.batch_query(
field.row(2),
field.row(7),
field.row(10),
)
response = client.query(bq)
self.assertEqual(3, len(response.results))
for result in response.results:
self.assertEqual([], result.row.columns)

def test_csv_import_fast(self):
client = self.get_client()
text = u"""
Expand All @@ -328,6 +341,21 @@ def test_csv_import_fast(self):
self.assertEqual(3, len(response.results))
self.assertEqual(target, [result.row.columns[0] for result in response.results])

# test clear import
reader = csv_column_reader(StringIO(text))
field = self.index.field("importfield-fast")
client.ensure_field(field)
client.import_field(field, reader, fast_import=True, clear=True)
bq = self.index.batch_query(
field.row(2),
field.row(7),
field.row(10),
)
response = client.query(bq)
self.assertEqual(3, len(response.results))
for result in response.results:
self.assertEqual([], result.row.columns)

def test_csv_import_row_keys(self):
client = self.get_client()
text = u"""
Expand Down
25 changes: 14 additions & 11 deletions pilosa/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,16 @@ def sync_schema(self, schema):
if field_name not in RESERVED_FIELDS:
local_index._fields[field_name] = field

def import_field(self, field, bit_reader, batch_size=100000, fast_import=False):
def import_field(self, field, bit_reader, batch_size=100000, fast_import=False, clear=False):
"""Imports a field using the given bit reader

:param field:
:param bit_reader:
:param batch_size:
:param clear: clear bits instead of setting them
"""
for shard, columns in batch_columns(bit_reader, batch_size):
self._import_data(field, shard, columns, fast_import)
self._import_data(field, shard, columns, fast_import, clear)

def http_request(self, method, path, data=None, headers=None):
"""Sends an HTTP request to the Pilosa server
Expand All @@ -280,7 +281,7 @@ def http_request(self, method, path, data=None, headers=None):
"""
return self.__http_request(method, path, data=data, headers=headers)

def _import_data(self, field, shard, data, fast_import):
def _import_data(self, field, shard, data, fast_import, clear):
if field.field_type != "int":
# sort by row_id then by column_id
if not field.index.keys:
Expand All @@ -302,13 +303,13 @@ def _import_data(self, field, shard, data, fast_import):
for node in nodes:
client = Client(URI.address(node.url), **client_params)
if field.field_type == "int":
client._import_node(_ImportValueRequest(field, shard, data))
client._import_node(_ImportValueRequest(field, shard, data), clear)
else:
req = _ImportRequest(field, shard, data)
if fast_import and field.field_type == "set" and req.format == csv_row_id_column_id:
client._import_node_fast(req)
client._import_node_fast(req, clear)
else:
client._import_node(req)
client._import_node(req, clear)

def _fetch_fragment_nodes(self, index_name, shard):
path = "/internal/fragment/nodes?shard=%d&index=%s" % (shard, index_name)
Expand All @@ -331,23 +332,25 @@ def _fetch_coordinator_node(self):
return _Node(uri["scheme"], uri["host"], uri["port"])
raise PilosaServerError(response)

def _import_node(self, import_request):
def _import_node(self, import_request, clear):
data = import_request.to_protobuf()
headers = {
'Content-Type': 'application/x-protobuf',
'Accept': 'application/x-protobuf',
}
path = "/index/%s/field/%s/import" % (import_request.index_name, import_request.field_name)
clear_str = "?clear=true" if clear else ""
path = "/index/%s/field/%s/import%s" % (import_request.index_name, import_request.field_name, clear_str)
self.__http_request("POST", path, data=data, headers=headers)

def _import_node_fast(self, import_request):
def _import_node_fast(self, import_request, clear):
data = import_request.to_bitmap()
headers = {
'Content-Type': 'application/x-binary',
'Accept': 'application/x-protobuf',
}
path = "/index/%s/field/%s/import-roaring/%d" % \
(import_request.index_name, import_request.field_name, import_request.shard)
clear_str = "?clear=true" if clear else ""
path = "/index/%s/field/%s/import-roaring/%d%s" % \
(import_request.index_name, import_request.field_name, import_request.shard, clear_str)
self.__http_request("POST", path, data=data, headers=headers)

def __http_request(self, method, path, data=None, headers=None, use_coordinator=False):
Expand Down
10 changes: 0 additions & 10 deletions pilosa/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,16 +747,6 @@ def serialize(self):
return SerializedQuery(text, has_keys)


def id_key_format(name, id_key, id_fmt, key_fmt):
if isinstance(id_key, bool) or isinstance(id_key, int):
return id_fmt
elif isinstance(id_key, _basestring):
validate_key(id_key)
return key_fmt
else:
raise ValidationError("%s must be an integer, boolean or string" % name)


def idkey_as_str(id_key):
if isinstance(id_key, bool):
return "true" if id_key else "false"
Expand Down