diff --git a/integration_tests/test_client_it.py b/integration_tests/test_client_it.py index a4f9ef7..8204dac 100644 --- a/integration_tests/test_client_it.py +++ b/integration_tests/test_client_it.py @@ -306,6 +306,19 @@ def test_csv_import(self): self.assertEqual(3, len(response.results)) self.assertEqual(target, [result.row.columns[0] for result in response.results]) + # test clear import + reader = csv_column_reader(StringIO(text)) + client.import_field(field, reader, clear=True) + bq = self.index.batch_query( + field.row(2), + field.row(7), + field.row(10), + ) + response = client.query(bq) + self.assertEqual(3, len(response.results)) + for result in response.results: + self.assertEqual([], result.row.columns) + def test_csv_import_fast(self): client = self.get_client() text = u""" @@ -328,6 +341,21 @@ def test_csv_import_fast(self): self.assertEqual(3, len(response.results)) self.assertEqual(target, [result.row.columns[0] for result in response.results]) + # test clear import + reader = csv_column_reader(StringIO(text)) + field = self.index.field("importfield-fast") + client.ensure_field(field) + client.import_field(field, reader, fast_import=True, clear=True) + bq = self.index.batch_query( + field.row(2), + field.row(7), + field.row(10), + ) + response = client.query(bq) + self.assertEqual(3, len(response.results)) + for result in response.results: + self.assertEqual([], result.row.columns) + def test_csv_import_row_keys(self): client = self.get_client() text = u""" diff --git a/pilosa/client.py b/pilosa/client.py index f6aedf8..562430d 100644 --- a/pilosa/client.py +++ b/pilosa/client.py @@ -256,15 +256,16 @@ def sync_schema(self, schema): if field_name not in RESERVED_FIELDS: local_index._fields[field_name] = field - def import_field(self, field, bit_reader, batch_size=100000, fast_import=False): + def import_field(self, field, bit_reader, batch_size=100000, fast_import=False, clear=False): """Imports a field using the given bit reader :param field: :param bit_reader: :param batch_size: + :param clear: clear bits instead of setting them """ for shard, columns in batch_columns(bit_reader, batch_size): - self._import_data(field, shard, columns, fast_import) + self._import_data(field, shard, columns, fast_import, clear) def http_request(self, method, path, data=None, headers=None): """Sends an HTTP request to the Pilosa server @@ -280,7 +281,7 @@ def http_request(self, method, path, data=None, headers=None): """ return self.__http_request(method, path, data=data, headers=headers) - def _import_data(self, field, shard, data, fast_import): + def _import_data(self, field, shard, data, fast_import, clear): if field.field_type != "int": # sort by row_id then by column_id if not field.index.keys: @@ -302,13 +303,13 @@ def _import_data(self, field, shard, data, fast_import): for node in nodes: client = Client(URI.address(node.url), **client_params) if field.field_type == "int": - client._import_node(_ImportValueRequest(field, shard, data)) + client._import_node(_ImportValueRequest(field, shard, data), clear) else: req = _ImportRequest(field, shard, data) if fast_import and field.field_type == "set" and req.format == csv_row_id_column_id: - client._import_node_fast(req) + client._import_node_fast(req, clear) else: - client._import_node(req) + client._import_node(req, clear) def _fetch_fragment_nodes(self, index_name, shard): path = "/internal/fragment/nodes?shard=%d&index=%s" % (shard, index_name) @@ -331,23 +332,25 @@ def _fetch_coordinator_node(self): return _Node(uri["scheme"], uri["host"], uri["port"]) raise PilosaServerError(response) - def _import_node(self, import_request): + def _import_node(self, import_request, clear): data = import_request.to_protobuf() headers = { 'Content-Type': 'application/x-protobuf', 'Accept': 'application/x-protobuf', } - path = "/index/%s/field/%s/import" % (import_request.index_name, import_request.field_name) + clear_str = "?clear=true" if clear else "" + path = "/index/%s/field/%s/import%s" % (import_request.index_name, import_request.field_name, clear_str) self.__http_request("POST", path, data=data, headers=headers) - def _import_node_fast(self, import_request): + def _import_node_fast(self, import_request, clear): data = import_request.to_bitmap() headers = { 'Content-Type': 'application/x-binary', 'Accept': 'application/x-protobuf', } - path = "/index/%s/field/%s/import-roaring/%d" % \ - (import_request.index_name, import_request.field_name, import_request.shard) + clear_str = "?clear=true" if clear else "" + path = "/index/%s/field/%s/import-roaring/%d%s" % \ + (import_request.index_name, import_request.field_name, import_request.shard, clear_str) self.__http_request("POST", path, data=data, headers=headers) def __http_request(self, method, path, data=None, headers=None, use_coordinator=False): diff --git a/pilosa/orm.py b/pilosa/orm.py index 3903ab6..aa40258 100644 --- a/pilosa/orm.py +++ b/pilosa/orm.py @@ -747,16 +747,6 @@ def serialize(self): return SerializedQuery(text, has_keys) -def id_key_format(name, id_key, id_fmt, key_fmt): - if isinstance(id_key, bool) or isinstance(id_key, int): - return id_fmt - elif isinstance(id_key, _basestring): - validate_key(id_key) - return key_fmt - else: - raise ValidationError("%s must be an integer, boolean or string" % name) - - def idkey_as_str(id_key): if isinstance(id_key, bool): return "true" if id_key else "false"