Skip to content

Commit

Permalink
Merge pull request #1 from OpenMatchmaking/feature-tests-for-workers
Browse files Browse the repository at this point in the history
Tests for AMQP workers
  • Loading branch information
Relrin committed May 26, 2018
2 parents 889d6a7 + 1dd6684 commit 0d68122
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ services:
- MONGODB_USERNAME=user
- MONGODB_PASSWORD=password
- MONGODB_HOST=mongodb
- MONGODB_DATABASE=auth
- MONGODB_DATABASE=game_servers_pool
- WAIT_FOR_MONGODB=30
- WAIT_FOR_REDIS=30
- WAIT_FOR_RABBITMQ=30
Expand Down Expand Up @@ -80,7 +80,7 @@ services:
environment:
- MONGODB_USERNAME=user
- MONGODB_PASSWORD=password
- MONGODB_DATABASE=auth
- MONGODB_DATABASE=game_servers_pool
- MONGODB_ROOT_PASSWORD=root
networks:
- app-tier
Expand Down
3 changes: 1 addition & 2 deletions game-servers-pool/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sanic_mongodb_ext import MongoDbExtension
from sanic_amqp_ext import AmqpExtension

from app.workers import GetServerWorker, MicroserviceRegisterWorker, RegisterServerWorker
from app.workers import GetServerWorker, RegisterServerWorker


app = Sanic('microservice-auth')
Expand All @@ -16,7 +16,6 @@

# RabbitMQ workers
app.amqp.register_worker(GetServerWorker(app))
app.amqp.register_worker(MicroserviceRegisterWorker(app))
app.amqp.register_worker(RegisterServerWorker(app))


Expand Down
10 changes: 10 additions & 0 deletions game-servers-pool/app/commands/run_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from sanic_script import Command, Option

from app import app
from app.workers.microservice_register import MicroserviceRegisterWorker


class RunServerCommand(Command):
Expand All @@ -14,7 +16,15 @@ class RunServerCommand(Command):
Option('--port', '-p', dest='port'),
)

def register_microservice(self):
loop = asyncio.get_event_loop()
worker = MicroserviceRegisterWorker(self.app)
loop.run_until_complete(worker.run(loop=loop))
loop.stop()
loop.close()

def run(self, *args, **kwargs):
self.register_microservice()
self.app.run(
host=kwargs.get('host', None) or self.app.config["APP_HOST"],
port=kwargs.get('port', None) or self.app.config["APP_PORT"],
Expand Down
30 changes: 27 additions & 3 deletions game-servers-pool/app/game_servers/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from marshmallow import Schema, fields, validate
from bson import ObjectId
from marshmallow import Schema, fields, validate, validates, ValidationError

from app import app

Expand All @@ -23,12 +24,35 @@ class RequestGetServerSchema(Schema):
)


class GameServerSchema(GameServer.schema.as_marshmallow_schema()):
class RegisterGameServerSchema(GameServer.schema.as_marshmallow_schema()):
id = fields.String(required=False)

@validates('id')
def validate_id(self, value):
if not ObjectId.is_valid(value):
raise ValidationError(
"'{}' is not a valid ObjectId, it must be a 12-byte "
"input or a 24-character hex string.".format(value)
)

class Meta:
model = GameServer
fields = (
'id',
'host',
'port',
'available_slots',
'credentials',
'game_mode',
)


class RetrieveGameServerSchema(GameServer.schema.as_marshmallow_schema()):

class Meta:
model = GameServer
fields = (
'host',
'port',
'credentials'
'credentials',
)
19 changes: 12 additions & 7 deletions game-servers-pool/app/workers/get_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class GetServerWorker(AmqpWorker):
def __init__(self, app, *args, **kwargs):
super(GetServerWorker, self).__init__(app, *args, **kwargs)
from app.game_servers.documents import GameServer
from app.game_servers.schemas import RequestGetServerSchema, GameServerSchema
from app.game_servers.schemas import RequestGetServerSchema, RetrieveGameServerSchema
self.game_server_document = GameServer
self.schema = GameServerSchema
self.schema = RetrieveGameServerSchema
self.request_schema = RequestGetServerSchema

async def validate_data(self, raw_data):
Expand Down Expand Up @@ -46,12 +46,17 @@ async def get_game_server(self, raw_data):
{'available_slots': {'$gte': data['required_slots']}},
{"game_mode": data['game_mode']}
]
}}
}},
{'$sample': {'size': 1}}
]
instance = await self.game_server_document.collection.aggregate(pipeline).limit(1)

serializer = self.schema()
serialized_instance = serializer.dump(instance).data
result = await self.game_server_document.collection.aggregate(pipeline).to_list(1)

if result:
instance = result[0]
serializer = self.schema()
serialized_instance = serializer.dump(instance).data
else:
serialized_instance = None
return Response.with_content(serialized_instance)

async def process_request(self, channel, body, envelope, properties):
Expand Down
13 changes: 6 additions & 7 deletions game-servers-pool/app/workers/register_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json
from uuid import uuid4


from aioamqp import AmqpClosedConnection
from bson import ObjectId
from marshmallow import ValidationError
from sanic_amqp_ext import AmqpWorker
from sage_utils.constants import VALIDATION_ERROR
Expand All @@ -18,15 +17,15 @@ class RegisterServerWorker(AmqpWorker):
def __init__(self, app, *args, **kwargs):
super(RegisterServerWorker, self).__init__(app, *args, **kwargs)
from app.game_servers.documents import GameServer
from app.game_servers.schemas import RegisterGameServerSchema
self.game_server_document = GameServer
self.schema = GameServer.schema.as_marshmallow_schema()
self.schema = RegisterGameServerSchema

async def validate_data(self, raw_data):
try:
data = json.loads(raw_data.strip())
except json.decoder.JSONDecodeError:
data = {}

deserializer = self.schema()
result = deserializer.load(data)
if result.errors:
Expand All @@ -40,12 +39,12 @@ async def register_game_server(self, raw_data):
except ValidationError as exc:
return Response.from_error(VALIDATION_ERROR, exc.normalized_messages())

game_server_id = data.get('id', str(uuid4()))
object_id = ObjectId(data['id']) if 'id' in data.keys() else ObjectId()
await self.game_server_document.collection.replace_one(
{'_id': game_server_id}, replacement=data, upsert=True
{'_id': object_id}, replacement=data, upsert=True
)

return Response.with_content({'id': game_server_id})
return Response.with_content({'id': str(object_id)})

async def process_request(self, channel, body, envelope, properties):
response = await self.register_game_server(body)
Expand Down
209 changes: 209 additions & 0 deletions game-servers-pool/tests/test_get_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import pytest
from sage_utils.amqp.clients import RpcAmqpClient
from sage_utils.constants import VALIDATION_ERROR
from sage_utils.wrappers import Response

from app.game_servers.documents import GameServer
from app.workers.get_server import GetServerWorker


REQUEST_QUEUE = GetServerWorker.QUEUE_NAME
REQUEST_EXCHANGE = GetServerWorker.REQUEST_EXCHANGE_NAME
RESPONSE_EXCHANGE = GetServerWorker.RESPONSE_EXCHANGE_NAME


async def create_game_servers(init_data_list):
objects = []
for create_data in init_data_list:
game_server = GameServer(**create_data)
await game_server.commit()
objects.append(game_server)
return objects


@pytest.mark.asyncio
async def test_worker_returns_one_existing_server_for_one_server_in_list(sanic_server):
await GameServer.collection.delete_many({})

create_data = {
'host': '127.0.0.1',
'port': 9000,
'available_slots': 100,
'credentials': {
'token': 'super_secret_token'
},
'game_mode': '1v1'
}
objects = await create_game_servers([create_data, ])

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={
'required_slots': 20,
'game_mode': "1v1"
})

assert Response.EVENT_FIELD_NAME in response.keys()
assert Response.CONTENT_FIELD_NAME in response.keys()
content = response[Response.CONTENT_FIELD_NAME]

assert len(list(content.keys())) == 3
assert set(content.keys()) == {'host', 'port', 'credentials'}

assert content['host'] == objects[0].host
assert content['port'] == objects[0].port
assert content['credentials'] == objects[0].credentials

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_a_random_server_from_a_list(sanic_server):
await GameServer.collection.delete_many({})

objects = await create_game_servers([
{
'host': '127.0.0.1',
'port': 9000,
'available_slots': 100,
'credentials': {
'token': 'super_secret_token'
},
'game_mode': 'team-deathmatch'
},
{
'host': '127.0.0.1',
'port': 9001,
'available_slots': 50,
'credentials': {
'token': 'super_secret_token2'
},
'game_mode': 'team-deathmatch'
},
{
'host': '127.0.0.1',
'port': 9002,
'available_slots': 10,
'credentials': {
'token': 'super_secret_token3'
},
'game_mode': 'team-deathmatch'
},
])

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={
'required_slots': 10,
'game_mode': 'team-deathmatch'
})

assert Response.EVENT_FIELD_NAME in response.keys()
assert Response.CONTENT_FIELD_NAME in response.keys()
content = response[Response.CONTENT_FIELD_NAME]

assert len(list(content.keys())) == 3
assert set(content.keys()) == {'host', 'port', 'credentials'}

filter_func = lambda obj: obj.host == content['host'] and obj.port == content['port'] # NOQA
extracted_server = list(filter(filter_func, objects))[0]

assert content['host'] == extracted_server.host
assert content['port'] == extracted_server.port
assert content['credentials'] == extracted_server.credentials

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_none_for_an_empty_list_of_servers(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={
'required_slots': 10,
'game_mode': 'team-deathmatch'
})

assert Response.EVENT_FIELD_NAME in response.keys()
assert Response.CONTENT_FIELD_NAME in response.keys()
content = response[Response.CONTENT_FIELD_NAME]

assert content is None

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_none_for_an_non_existing_server_type(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={
'required_slots': 10,
'game_mode': 'battle-royal'
})

assert Response.EVENT_FIELD_NAME in response.keys()
assert Response.CONTENT_FIELD_NAME in response.keys()
content = response[Response.CONTENT_FIELD_NAME]

assert content is None

await GameServer.collection.delete_many({})


@pytest.mark.asyncio
async def test_worker_returns_a_validation_error_for_missing_fields(sanic_server):
await GameServer.collection.delete_many({})

client = RpcAmqpClient(
sanic_server.app,
routing_key=REQUEST_QUEUE,
request_exchange=REQUEST_EXCHANGE,
response_queue='',
response_exchange=RESPONSE_EXCHANGE
)
response = await client.send(payload={})

assert Response.ERROR_FIELD_NAME in response.keys()
error = response[Response.ERROR_FIELD_NAME]

assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR

assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2

for field in ['required_slots', 'game_mode']:
assert field in error[Response.ERROR_DETAILS_FIELD_NAME]
assert len(error[Response.ERROR_DETAILS_FIELD_NAME][field]) == 1
assert error[Response.ERROR_DETAILS_FIELD_NAME][field][0] == 'Missing data for ' \
'required field.'

servers_count = await GameServer.collection.find().count()
assert servers_count == 0

await GameServer.collection.delete_many({})
Loading

0 comments on commit 0d68122

Please sign in to comment.