Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize MAC addresses #16916

Merged
merged 2 commits into from Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions homeassistant/helpers/device_registry.py
Expand Up @@ -38,6 +38,25 @@ class DeviceEntry:
id = attr.ib(type=str, default=attr.Factory(lambda: uuid.uuid4().hex))


def format_mac(mac):
"""Format the mac address string for entry into dev reg."""
to_test = mac

if len(to_test) == 17 and to_test.count(':') == 5:
return to_test.lower()
elif len(to_test) == 17 and to_test.count('-') == 5:
to_test = to_test.replace('-', '')
elif len(to_test) == 14 and to_test.count('.') == 2:
to_test = to_test.replace('.', '')

if len(to_test) == 12:
# no : included
return ':'.join(to_test.lower()[i:i + 2] for i in range(0, 12, 2))

# Not sure how formatted, return original
return mac


class DeviceRegistry:
"""Class to hold a registry of devices."""

Expand Down Expand Up @@ -71,6 +90,12 @@ def async_get_or_create(self, *, config_entry_id, connections=None,
if connections is None:
connections = set()

connections = {
(key, format_mac(value)) if key == CONNECTION_NETWORK_MAC
else (key, value)
for key, value in connections
}

device = self.async_get_device(identifiers, connections)

if device is None:
Expand Down
99 changes: 84 additions & 15 deletions tests/helpers/test_device_registry.py
Expand Up @@ -17,20 +17,26 @@ async def test_get_or_create_returns_same_entry(registry):
"""Make sure we do not duplicate entries."""
entry = registry.async_get_or_create(
config_entry_id='1234',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
sw_version='sw-version',
name='name',
manufacturer='manufacturer',
model='model')
entry2 = registry.async_get_or_create(
config_entry_id='1234',
connections={('ethernet', '11:22:33:44:55:66:77:88')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '11:22:33:66:77:88')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')
entry3 = registry.async_get_or_create(
config_entry_id='1234',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')}
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
}
)

assert len(registry.devices) == 1
Expand All @@ -48,7 +54,9 @@ async def test_requirement_for_identifier_or_connection(registry):
"""Make sure we do require some descriptor of device."""
entry = registry.async_get_or_create(
config_entry_id='1234',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers=set(),
manufacturer='manufacturer', model='model')
entry2 = registry.async_get_or_create(
Expand All @@ -72,17 +80,23 @@ async def test_multiple_config_entries(registry):
"""Make sure we do not get duplicate entries."""
entry = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')
entry2 = registry.async_get_or_create(
config_entry_id='456',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')
entry3 = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')

Expand Down Expand Up @@ -112,7 +126,7 @@ async def test_loading_from_storage(hass, hass_storage):
'identifiers': [
[
'serial',
'12:34:56:78:90:AB:CD:EF'
'12:34:56:AB:CD:EF'
]
],
'manufacturer': 'manufacturer',
Expand All @@ -129,7 +143,7 @@ async def test_loading_from_storage(hass, hass_storage):
entry = registry.async_get_or_create(
config_entry_id='1234',
connections={('Zigbee', '01.23.45.67.89')},
identifiers={('serial', '12:34:56:78:90:AB:CD:EF')},
identifiers={('serial', '12:34:56:AB:CD:EF')},
manufacturer='manufacturer', model='model')
assert entry.id == 'abcdefghijklm'
assert isinstance(entry.config_entries, set)
Expand All @@ -139,17 +153,23 @@ async def test_removing_config_entries(registry):
"""Make sure we do not get duplicate entries."""
entry = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')
entry2 = registry.async_get_or_create(
config_entry_id='456',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('bridgeid', '0123')},
manufacturer='manufacturer', model='model')
entry3 = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '34:56:78:90:AB:CD:EF:12')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '34:56:78:CD:EF:12')
},
identifiers={('bridgeid', '4567')},
manufacturer='manufacturer', model='model')

Expand All @@ -170,7 +190,9 @@ async def test_specifying_hub_device_create(registry):
"""Test specifying a hub and updating."""
hub = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('hue', '0123')},
manufacturer='manufacturer', model='hub')

Expand All @@ -197,7 +219,9 @@ async def test_specifying_hub_device_update(registry):

hub = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('hue', '0123')},
manufacturer='manufacturer', model='hub')

Expand All @@ -215,7 +239,9 @@ async def test_loading_saving_data(hass, registry):
"""Test that we load/save data correctly."""
orig_hub = registry.async_get_or_create(
config_entry_id='123',
connections={('ethernet', '12:34:56:78:90:AB:CD:EF')},
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
identifiers={('hue', '0123')},
manufacturer='manufacturer', model='hub')

Expand Down Expand Up @@ -259,3 +285,46 @@ async def test_no_unnecessary_changes(registry):

assert entry.id == entry2.id
assert len(mock_save.mock_calls) == 0


async def test_format_mac(registry):
"""Make sure we normalize mac addresses."""
entry = registry.async_get_or_create(
config_entry_id='1234',
connections={
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:AB:CD:EF')
},
)
for mac in [
'123456ABCDEF',
'123456abcdef',
'12:34:56:ab:cd:ef',
'1234.56ab.cdef',
]:
test_entry = registry.async_get_or_create(
config_entry_id='1234',
connections={
(device_registry.CONNECTION_NETWORK_MAC, mac)
},
)
assert test_entry.id == entry.id, mac
assert test_entry.connections == {
(device_registry.CONNECTION_NETWORK_MAC, '12:34:56:ab:cd:ef')
}

# This should not raise
for invalid in [
'invalid_mac',
'123456ABCDEFG', # 1 extra char
'12:34:56:ab:cdef', # not enough :
'12:34:56:ab:cd:e:f', # too many :
'1234.56abcdef', # not enough .
'123.456.abc.def', # too many .
]:
invalid_mac_entry = registry.async_get_or_create(
config_entry_id='1234',
connections={
(device_registry.CONNECTION_NETWORK_MAC, invalid)
},
)
assert list(invalid_mac_entry.connections)[0][1] == invalid