Skip to content

Commit

Permalink
IGNITE-14240 Re-factor tests
Browse files Browse the repository at this point in the history
Handle authentication error.
Fix infinite recursion on failed connection on handshake.
Skip affinity test if server doesn't support protocol.
Remove travis.

This closes #19
  • Loading branch information
ivandasch authored and isapego committed Feb 25, 2021
1 parent 672a767 commit b2030be
Show file tree
Hide file tree
Showing 32 changed files with 472 additions and 494 deletions.
48 changes: 0 additions & 48 deletions .travis.yml

This file was deleted.

25 changes: 19 additions & 6 deletions pyignite/connection/connection.py
Expand Up @@ -34,7 +34,7 @@

from pyignite.constants import *
from pyignite.exceptions import (
HandshakeError, ParameterError, SocketError, connection_errors,
HandshakeError, ParameterError, SocketError, connection_errors, AuthenticationError,
)
from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
from pyignite.datatypes.internal import Struct
Expand All @@ -43,6 +43,8 @@
from .ssl import wrap
from ..stream import BinaryStream, READ_BACKWARD

CLIENT_STATUS_AUTH_FAILURE = 2000


class Connection:
"""
Expand Down Expand Up @@ -180,7 +182,7 @@ def read_response(self) -> Union[dict, OrderedDict]:
('length', Int),
('op_code', Byte),
])
with BinaryStream(self, self.recv()) as stream:
with BinaryStream(self, self.recv(reconnect=False)) as stream:
start_class = response_start.parse(stream)
start = stream.read_ctype(start_class, direction=READ_BACKWARD)
data = response_start.to_python(start)
Expand All @@ -191,6 +193,7 @@ def read_response(self) -> Union[dict, OrderedDict]:
('version_minor', Short),
('version_patch', Short),
('message', String),
('client_status', Int)
])
elif self.get_protocol_version() >= (1, 4, 0):
response_end = Struct([
Expand Down Expand Up @@ -267,7 +270,7 @@ def _connect_version(

with BinaryStream(self) as stream:
hs_request.from_python(stream)
self.send(stream.getbuffer())
self.send(stream.getbuffer(), reconnect=False)

hs_response = self.read_response()
if hs_response['op_code'] == 0:
Expand All @@ -291,6 +294,8 @@ def _connect_version(
client_patch=protocol_version[2],
**hs_response
)
elif hs_response['client_status'] == CLIENT_STATUS_AUTH_FAILURE:
raise AuthenticationError(error_text)
raise HandshakeError((
hs_response['version_major'],
hs_response['version_minor'],
Expand All @@ -313,12 +318,13 @@ def reconnect(self):
except connection_errors:
pass

def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
def send(self, data: Union[bytes, bytearray, memoryview], flags=None, reconnect=True):
"""
Send data down the socket.
:param data: bytes to send,
:param flags: (optional) OS-specific flags.
:param reconnect: (optional) reconnect on failure, default True.
"""
if self.closed:
raise SocketError('Attempt to use closed connection.')
Expand All @@ -334,7 +340,13 @@ def send(self, data: Union[bytes, bytearray, memoryview], flags=None):
self.reconnect()
raise

def recv(self, flags=None) -> bytearray:
def recv(self, flags=None, reconnect=True) -> bytearray:
"""
Receive data from the socket.
:param flags: (optional) OS-specific flags.
:param reconnect: (optional) reconnect on failure, default True.
"""
def _recv(buffer, num_bytes):
bytes_to_receive = num_bytes
while bytes_to_receive > 0:
Expand All @@ -344,7 +356,8 @@ def _recv(buffer, num_bytes):
raise SocketError('Connection broken.')
except connection_errors:
self.failed = True
self.reconnect()
if reconnect:
self.reconnect()
raise

buffer = buffer[bytes_rcvd:]
Expand Down
2 changes: 1 addition & 1 deletion pyignite/constants.py
Expand Up @@ -49,7 +49,7 @@
PROTOCOL_STRING_ENCODING = 'utf-8'
PROTOCOL_CHAR_ENCODING = 'utf-16le'

SSL_DEFAULT_VERSION = ssl.PROTOCOL_TLSv1_1
SSL_DEFAULT_VERSION = ssl.PROTOCOL_TLSv1_2
SSL_DEFAULT_CIPHERS = ssl._DEFAULT_CIPHERS

FNV1_OFFSET_BASIS = 0x811c9dc5
Expand Down
9 changes: 9 additions & 0 deletions pyignite/exceptions.py
Expand Up @@ -25,6 +25,15 @@ class ParseError(Exception):
pass


class AuthenticationError(Exception):
"""
This exception is raised on authentication failure.
"""

def __init__(self, message: str):
self.message = message


class HandshakeError(SocketError):
"""
This exception is raised on Ignite binary protocol handshake failure,
Expand Down
2 changes: 1 addition & 1 deletion requirements/install.txt
@@ -1,3 +1,3 @@
# these pip packages are necessary for the pyignite to run

attrs==18.1.0
attrs==20.3.0
2 changes: 1 addition & 1 deletion requirements/setup.txt
@@ -1,3 +1,3 @@
# additional package for integrating pytest in setuptools

pytest-runner==4.2
pytest-runner==5.3.0
8 changes: 4 additions & 4 deletions requirements/tests.txt
@@ -1,7 +1,7 @@
# these packages are used for testing

pytest==3.6.1
pytest-cov==2.5.1
teamcity-messages==1.21
psutil==5.6.5
pytest==6.2.2
pytest-cov==2.11.1
teamcity-messages==1.28
psutil==5.8.0
jinja2==2.11.3
72 changes: 72 additions & 0 deletions tests/affinity/conftest.py
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from pyignite import Client
from pyignite.api import cache_create, cache_destroy
from tests.util import start_ignite_gen


@pytest.fixture(scope='module', autouse=True)
def server1():
yield from start_ignite_gen(1)


@pytest.fixture(scope='module', autouse=True)
def server2():
yield from start_ignite_gen(2)


@pytest.fixture(scope='module', autouse=True)
def server3():
yield from start_ignite_gen(3)


@pytest.fixture
def client():
client = Client(partition_aware=True)

client.connect([('127.0.0.1', 10800 + i) for i in range(1, 4)])

yield client

client.close()


@pytest.fixture
def client_not_connected():
client = Client(partition_aware=True)
yield client
client.close()


@pytest.fixture
def cache(connected_client):
cache_name = 'my_bucket'
conn = connected_client.random_node

cache_create(conn, cache_name)
yield cache_name
cache_destroy(conn, cache_name)


@pytest.fixture(scope='module', autouse=True)
def skip_if_no_affinity(request, server1):
client = Client(partition_aware=True)
client.connect('127.0.0.1', 10801)

if not client.partition_awareness_supported_by_protocol:
pytest.skip(f'skipped {request.node.name}, partition awareness is not supported.')
43 changes: 16 additions & 27 deletions tests/test_affinity.py → tests/affinity/test_affinity.py
Expand Up @@ -27,12 +27,11 @@
from pyignite.datatypes.prop_codes import *


def test_get_node_partitions(client_partition_aware):
def test_get_node_partitions(client):
conn = client.random_node

conn = client_partition_aware.random_node

cache_1 = client_partition_aware.get_or_create_cache('test_cache_1')
cache_2 = client_partition_aware.get_or_create_cache({
cache_1 = client.get_or_create_cache('test_cache_1')
cache_2 = client.get_or_create_cache({
PROP_NAME: 'test_cache_2',
PROP_CACHE_KEY_CONFIGURATION: [
{
Expand All @@ -41,9 +40,9 @@ def test_get_node_partitions(client_partition_aware):
}
],
})
cache_3 = client_partition_aware.get_or_create_cache('test_cache_3')
cache_4 = client_partition_aware.get_or_create_cache('test_cache_4')
cache_5 = client_partition_aware.get_or_create_cache('test_cache_5')
client.get_or_create_cache('test_cache_3')
client.get_or_create_cache('test_cache_4')
client.get_or_create_cache('test_cache_5')

result = cache_get_node_partitions(
conn,
Expand Down Expand Up @@ -115,9 +114,8 @@ def test_get_node_partitions(client_partition_aware):
],
)
def test_affinity(client_partition_aware, key, key_hint):

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity(client, key, key_hint):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -126,7 +124,7 @@ def test_affinity(client_partition_aware, key, key_hint):

best_node = cache_1.get_best_node(key, key_hint=key_hint)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key, key_hint=key_hint,
)
Expand All @@ -142,9 +140,8 @@ def test_affinity(client_partition_aware, key, key_hint):
cache_1.destroy()


def test_affinity_for_generic_object(client_partition_aware):

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity_for_generic_object(client):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -166,7 +163,7 @@ class KeyClass(

best_node = cache_1.get_best_node(key, key_hint=BinaryObject)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key, key_hint=BinaryObject,
)
Expand All @@ -182,16 +179,8 @@ class KeyClass(
cache_1.destroy()


def test_affinity_for_generic_object_without_type_hints(client_partition_aware):

if not client_partition_aware.partition_awareness_supported_by_protocol:
pytest.skip(
'Best effort affinity is not supported by the protocol {}.'.format(
client_partition_aware.protocol_version
)
)

cache_1 = client_partition_aware.get_or_create_cache({
def test_affinity_for_generic_object_without_type_hints(client):
cache_1 = client.get_or_create_cache({
PROP_NAME: 'test_cache_1',
PROP_CACHE_MODE: CacheMode.PARTITIONED,
})
Expand All @@ -213,7 +202,7 @@ class KeyClass(

best_node = cache_1.get_best_node(key)

for node in filter(lambda n: n.alive, client_partition_aware._nodes):
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(
node, cache_1.cache_id, key
)
Expand Down

0 comments on commit b2030be

Please sign in to comment.