Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up remaining rflink tests #19551

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 36 additions & 51 deletions tests/components/test_rflink.py
@@ -1,6 +1,5 @@
"""Common functions for Rflink component tests and generic platform tests."""
"""Common functions for RFLink component tests and generic platform tests."""

import asyncio
from unittest.mock import Mock

from homeassistant.bootstrap import async_setup_component
Expand All @@ -10,23 +9,19 @@
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_STOP_COVER)


@asyncio.coroutine
def mock_rflink(hass, config, domain, monkeypatch, failures=None):
"""Create mock Rflink asyncio protocol, test component setup."""
async def mock_rflink(hass, config, domain, monkeypatch, failures=None):
"""Create mock RFLink asyncio protocol, test component setup."""
transport, protocol = (Mock(), Mock())

@asyncio.coroutine
def send_command_ack(*command):
async def send_command_ack(*command):
return True
protocol.send_command_ack = Mock(wraps=send_command_ack)

@asyncio.coroutine
def send_command(*command):
return True
protocol.send_command = Mock(wraps=send_command)

@asyncio.coroutine
def create_rflink_connection(*args, **kwargs):
async def create_rflink_connection(*args, **kwargs):
"""Return mocked transport and protocol."""
# failures can be a list of booleans indicating in which sequence
# creating a connection should success or fail
Expand All @@ -45,7 +40,7 @@ def create_rflink_connection(*args, **kwargs):
'rflink.protocol.create_rflink_connection',
mock_create)

yield from async_setup_component(hass, domain, config)
await async_setup_component(hass, domain, config)

# hook into mock config for injecting events
event_callback = mock_create.call_args_list[0][1]['event_callback']
Expand All @@ -57,8 +52,7 @@ def create_rflink_connection(*args, **kwargs):
return event_callback, mock_create, protocol, disconnect_callback


@asyncio.coroutine
def test_version_banner(hass, monkeypatch):
async def test_version_banner(hass, monkeypatch):
"""Test sending unknown commands doesn't cause issues."""
# use sensor domain during testing main platform
domain = 'sensor'
Expand All @@ -73,7 +67,7 @@ def test_version_banner(hass, monkeypatch):
}

# setup mocking rflink module
event_callback, _, _, _ = yield from mock_rflink(
event_callback, _, _, _ = await mock_rflink(
hass, config, domain, monkeypatch)

event_callback({
Expand All @@ -84,8 +78,7 @@ def test_version_banner(hass, monkeypatch):
})


@asyncio.coroutine
def test_send_no_wait(hass, monkeypatch):
async def test_send_no_wait(hass, monkeypatch):
"""Test command sending without ack."""
domain = 'switch'
config = {
Expand All @@ -105,19 +98,18 @@ def test_send_no_wait(hass, monkeypatch):
}

# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
_, _, protocol, _ = await mock_rflink(
hass, config, domain, monkeypatch)

hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'switch.test'}))
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert protocol.send_command.call_args_list[0][0][0] == 'protocol_0_0'
assert protocol.send_command.call_args_list[0][0][1] == 'off'


@asyncio.coroutine
def test_cover_send_no_wait(hass, monkeypatch):
async def test_cover_send_no_wait(hass, monkeypatch):
"""Test command sending to a cover device without ack."""
domain = 'cover'
config = {
Expand All @@ -137,19 +129,18 @@ def test_cover_send_no_wait(hass, monkeypatch):
}

# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
_, _, protocol, _ = await mock_rflink(
hass, config, domain, monkeypatch)

hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_STOP_COVER,
{ATTR_ENTITY_ID: 'cover.test'}))
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert protocol.send_command.call_args_list[0][0][0] == 'RTS_0100F2_0'
assert protocol.send_command.call_args_list[0][0][1] == 'STOP'


@asyncio.coroutine
def test_send_command(hass, monkeypatch):
async def test_send_command(hass, monkeypatch):
"""Test send_command service."""
domain = 'rflink'
config = {
Expand All @@ -159,21 +150,20 @@ def test_send_command(hass, monkeypatch):
}

# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
_, _, protocol, _ = await mock_rflink(
hass, config, domain, monkeypatch)

hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_SEND_COMMAND,
{'device_id': 'newkaku_0000c6c2_1',
'command': 'on'}))
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert (protocol.send_command_ack.call_args_list[0][0][0]
== 'newkaku_0000c6c2_1')
assert protocol.send_command_ack.call_args_list[0][0][1] == 'on'


@asyncio.coroutine
def test_send_command_invalid_arguments(hass, monkeypatch):
async def test_send_command_invalid_arguments(hass, monkeypatch):
"""Test send_command service."""
domain = 'rflink'
config = {
Expand All @@ -183,25 +173,24 @@ def test_send_command_invalid_arguments(hass, monkeypatch):
}

# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
_, _, protocol, _ = await mock_rflink(
hass, config, domain, monkeypatch)

# one argument missing
hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_SEND_COMMAND,
{'command': 'on'}))
hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_SEND_COMMAND,
{'device_id': 'newkaku_0000c6c2_1'}))
# no arguments
hass.async_add_job(
hass.async_create_task(
hass.services.async_call(domain, SERVICE_SEND_COMMAND, {}))
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert protocol.send_command_ack.call_args_list == []


@asyncio.coroutine
def test_reconnecting_after_disconnect(hass, monkeypatch):
async def test_reconnecting_after_disconnect(hass, monkeypatch):
"""An unexpected disconnect should cause a reconnect."""
domain = 'sensor'
config = {
Expand All @@ -215,22 +204,21 @@ def test_reconnecting_after_disconnect(hass, monkeypatch):
}

# setup mocking rflink module
_, mock_create, _, disconnect_callback = yield from mock_rflink(
_, mock_create, _, disconnect_callback = await mock_rflink(
hass, config, domain, monkeypatch)

assert disconnect_callback, 'disconnect callback not passed to rflink'

# rflink initiated disconnect
disconnect_callback(None)

yield from hass.async_block_till_done()
await hass.async_block_till_done()

# we expect 2 call, the initial and reconnect
assert mock_create.call_count == 2


@asyncio.coroutine
def test_reconnecting_after_failure(hass, monkeypatch):
async def test_reconnecting_after_failure(hass, monkeypatch):
"""A failure to reconnect should be retried."""
domain = 'sensor'
config = {
Expand All @@ -247,22 +235,21 @@ def test_reconnecting_after_failure(hass, monkeypatch):
failures = [False, True, False]

# setup mocking rflink module
_, mock_create, _, disconnect_callback = yield from mock_rflink(
_, mock_create, _, disconnect_callback = await mock_rflink(
hass, config, domain, monkeypatch, failures=failures)

# rflink initiated disconnect
disconnect_callback(None)

# wait for reconnects to have happened
yield from hass.async_block_till_done()
yield from hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_block_till_done()

# we expect 3 calls, the initial and 2 reconnects
assert mock_create.call_count == 3


@asyncio.coroutine
def test_error_when_not_connected(hass, monkeypatch):
async def test_error_when_not_connected(hass, monkeypatch):
"""Sending command should error when not connected."""
domain = 'switch'
config = {
Expand All @@ -285,14 +272,12 @@ def test_error_when_not_connected(hass, monkeypatch):
failures = [False, True, False]

# setup mocking rflink module
_, _, _, disconnect_callback = yield from mock_rflink(
_, _, _, disconnect_callback = await mock_rflink(
hass, config, domain, monkeypatch, failures=failures)

# rflink initiated disconnect
disconnect_callback(None)

yield from asyncio.sleep(0, loop=hass.loop)

success = yield from hass.services.async_call(
success = await hass.services.async_call(
domain, SERVICE_TURN_OFF, {ATTR_ENTITY_ID: 'switch.test'})
assert not success, 'changing state should not succeed when disconnected'