Skip to content

Commit

Permalink
IGNITE-14211 Remove existing cache requirement from SQL API
Browse files Browse the repository at this point in the history
This closes #18
  • Loading branch information
isapego committed Feb 19, 2021
1 parent e5ca3fc commit 672a767
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 39 deletions.
23 changes: 9 additions & 14 deletions pyignite/api/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,36 +283,31 @@ def sql_fields(
Performs SQL fields query.
:param conn: connection to Ignite server,
:param cache: name or ID of the cache,
:param cache: name or ID of the cache. If zero, then schema is used.
:param query_str: SQL query string,
:param page_size: cursor page size,
:param query_args: (optional) query arguments. List of values or
(value, type hint) tuples,
:param schema: (optional) schema for the query. Defaults to `PUBLIC`,
:param schema: schema for the query.
:param statement_type: (optional) statement type. Can be:
* StatementType.ALL − any type (default),
* StatementType.SELECT − select,
* StatementType.UPDATE − update.
:param distributed_joins: (optional) distributed joins. Defaults to False,
:param distributed_joins: (optional) distributed joins.
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
on local node only.
:param replicated_only: (optional) whether query contains only
replicated tables or not. Defaults to False,
:param enforce_join_order: (optional) enforce join order. Defaults
to False,
replicated tables or not.
:param enforce_join_order: (optional) enforce join order.
:param collocated: (optional) whether your data is co-located or not.
Defaults to False,
:param lazy: (optional) lazy query execution. Defaults to False,
:param lazy: (optional) lazy query execution.
:param include_field_names: (optional) include field names in result.
Defaults to False,
:param max_rows: (optional) query-wide maximum of rows. Defaults to -1
(all rows),
:param max_rows: (optional) query-wide maximum of rows.
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout (default),
timeout.
:param binary: (optional) pass True to keep the value in binary form.
False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
Expand Down
18 changes: 12 additions & 6 deletions pyignite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
)
from .utils import (
capitalize, entity_id, schema_id, process_delimiter,
cache_id, capitalize, entity_id, schema_id, process_delimiter,
status_to_exception, is_iterable,
)
from .binary import GenericObjectMeta
Expand Down Expand Up @@ -513,13 +513,14 @@ def get_cache_names(self) -> list:
return cache_get_names(self.random_node)

def sql(
self, query_str: str, page_size: int = 1024, query_args: Iterable = None,
schema: Union[int, str] = 'PUBLIC',
self, query_str: str, page_size: int = 1024,
query_args: Iterable = None, schema: str = 'PUBLIC',
statement_type: int = 0, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False,
max_rows: int = -1, timeout: int = 0,
cache: Union[int, str, Cache] = None
):
"""
Runs an SQL query and returns its result.
Expand Down Expand Up @@ -553,6 +554,8 @@ def sql(
(all rows),
:param timeout: (optional) non-negative timeout value in ms.
Zero disables timeout (default),
:param cache (optional) Name or ID of the cache to use to infer schema.
If set, 'schema' argument is ignored,
:return: generator with result rows as a lists. If
`include_field_names` was set, the first row will hold field names.
"""
Expand Down Expand Up @@ -580,10 +583,13 @@ def generate_result(value):

conn = self.random_node

schema = self.get_cache(schema)
c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache)

if c_id != 0:
schema = None

result = sql_fields(
conn, schema.cache_id, query_str,
page_size, query_args, schema.name,
conn, c_id, query_str, page_size, query_args, schema,
statement_type, distributed_joins, local, replicated_only,
enforce_join_order, collocated, lazy, include_field_names,
max_rows, timeout,
Expand Down
3 changes: 3 additions & 0 deletions pyignite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:


def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int:
if data is None:
return 0

if isinstance(data, str):
"""
For strings we iterate over code point which are of the int type
Expand Down
5 changes: 0 additions & 5 deletions tests/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@


def test_sql_read_as_binary(client):

client.get_or_create_cache(scheme_name)
client.sql(drop_query)

# create table
Expand Down Expand Up @@ -92,9 +90,6 @@ def test_sql_read_as_binary(client):


def test_sql_write_as_binary(client):

client.get_or_create_cache(scheme_name)

# configure cache as an SQL table
type_name = table_cache_name

Expand Down
4 changes: 1 addition & 3 deletions tests/test_cache_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def test_cache_remove(client):


def test_cache_get(client):
client.get_or_create_cache('my_cache')

my_cache = client.get_cache('my_cache')
my_cache = client.get_or_create_cache('my_cache')
assert my_cache.settings[PROP_NAME] == 'my_cache'
my_cache.destroy()

Expand Down
84 changes: 73 additions & 11 deletions tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
sql, sql_cursor_get_page,
cache_get_configuration,
)
from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import *
from pyignite.exceptions import SQLError
from pyignite.utils import entity_id
from pyignite.binary import unwrap_binary


initial_data = [
('John', 'Doe', 5),
('Jane', 'Roe', 4),
Expand Down Expand Up @@ -59,9 +59,10 @@ def test_sql(client):

result = sql_fields(
conn,
'PUBLIC',
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
Expand All @@ -70,9 +71,10 @@ def test_sql(client):
fname, lname, grade = data_line
result = sql_fields(
conn,
'PUBLIC',
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
Expand Down Expand Up @@ -108,7 +110,7 @@ def test_sql(client):
assert data.type_id == entity_id(binary_type_name)

# repeat cleanup
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0


Expand All @@ -121,9 +123,10 @@ def test_sql_fields(client):

result = sql_fields(
conn,
'PUBLIC',
0,
create_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0, result.message
Expand All @@ -132,19 +135,21 @@ def test_sql_fields(client):
fname, lname, grade = data_line
result = sql_fields(
conn,
'PUBLIC',
0,
insert_query,
page_size,
schema='PUBLIC',
query_args=[i, fname, lname, grade],
include_field_names=True
)
assert result.status == 0, result.message

result = sql_fields(
conn,
'PUBLIC',
0,
select_query,
page_size,
schema='PUBLIC',
include_field_names=True
)
assert result.status == 0
Expand All @@ -159,7 +164,7 @@ def test_sql_fields(client):
assert result.value['more'] is False

# repeat cleanup
result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
assert result.status == 0


Expand All @@ -176,7 +181,7 @@ def test_long_multipage_query(client):

client.sql('DROP TABLE LongMultipageQuery IF EXISTS')

client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % \
client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" %
(fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:]))))

for id in range(1, 21):
Expand All @@ -193,6 +198,63 @@ def test_long_multipage_query(client):
client.sql(drop_query)


def test_sql_not_create_cache(client):
def test_sql_not_create_cache_with_schema(client):
with pytest.raises(SQLError, match=r".*Cache does not exist.*"):
client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting')
client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting')


def test_sql_not_create_cache_with_cache(client):
with pytest.raises(SQLError, match=r".*Failed to set schema.*"):
client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting')


def test_query_with_cache(client):
test_key = 42
test_value = 'Lorem ipsum'

cache_name = test_query_with_cache.__name__.upper()
schema_name = f'{cache_name}_schema'.upper()
table_name = f'{cache_name}_table'.upper()

cache = client.create_cache({
PROP_NAME: cache_name,
PROP_SQL_SCHEMA: schema_name,
PROP_CACHE_MODE: CacheMode.PARTITIONED,
PROP_QUERY_ENTITIES: [
{
'table_name': table_name,
'key_field_name': 'KEY',
'value_field_name': 'VALUE',
'key_type_name': 'java.lang.Long',
'value_type_name': 'java.lang.String',
'query_indexes': [],
'field_name_aliases': [],
'query_fields': [
{
'name': 'KEY',
'type_name': 'java.lang.Long',
'is_key_field': True,
'is_notnull_constraint_field': True,
},
{
'name': 'VALUE',
'type_name': 'java.lang.String',
},
],
},
],
})

cache.put(test_key, test_value)

args_to_check = [
('schema', schema_name),
('cache', cache),
('cache', cache.name),
('cache', cache.cache_id)
]

for param, value in args_to_check:
page = client.sql(f'select value from {table_name}', **{param: value})
received = next(page)[0]
assert test_value == received

0 comments on commit 672a767

Please sign in to comment.