Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Archer C6 v2.0 #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
257 changes: 256 additions & 1 deletion custom_components/tplink_router/device_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
import string, random
import time

from Crypto.Cipher import PKCS1_OAEP
from Crypto.Cipher import AES
from Crypto import Random
import json
from codecs import encode

from aiohttp.hdrs import (
ACCEPT,
COOKIE,
Expand Down Expand Up @@ -46,7 +52,8 @@

def get_scanner(hass, config):
"""Validate the configuration and return a TP-Link scanner."""
for cls in [VR600TplinkDeviceScanner,
for cls in [C6TplinkDeviceScanner,
VR600TplinkDeviceScanner,
EAP225TplinkDeviceScanner,
N600TplinkDeviceScanner,
C7TplinkDeviceScanner,
Expand Down Expand Up @@ -668,3 +675,251 @@ def _update_info(self):
self.token = ''
return False
return True

class C6TplinkDeviceScanner(TplinkDeviceScanner):
"""This class queries the Archer C9 router with version 150811 or high."""

def __init__(self, config):
"""Initialize the scanner."""
self.stok = ''
self.sysauth = ''

self.login = True
self.seq = ''
self.hash = ''

self.nn = ''
self.ee = ''

self.pwdNN = ''
self.pwdEE = ''

self.encryption = EncryptionWrapper()

super(C6TplinkDeviceScanner, self).__init__(config)

def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
self._log_out()
return self.last_results.keys()

def get_device_name(self, device):
"""Get the firmware doesn't save the name of the wireless device.
We are forced to use the MAC address as name here.
"""
return self.last_results.get(device)

def prepare_data(self, data):
encryptedData = self.encryption.AESEncrypt(data)
dataLen = len(encryptedData)

sign = self.encryption.getSignature(int(self.seq) + dataLen, self.login, self.hash, self.nn, self.ee)

return {'sign': sign, 'data': encryptedData}

def _get_auth_tokens(self):
name = 'admin'
pwd = 'admin'
self.hash = hashlib.md5((name + pwd).encode()).hexdigest()

"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving auth tokens...")
referer = 'http://{}/webpages/login.html?t=1596185370610'.format(self.host)

if self.pwdNN == '':
url = 'http://{}/cgi-bin/luci/;stok=/login?form=keys' \
.format(self.host)

# If possible implement RSA encryption of password here.
response = requests.post(
url, params={'operation': 'read'},
headers={REFERER: referer}, timeout=4)

jsonData = response.json()

if jsonData['success'] == False:
raise Exception('Unkown error: ' + jsonData)

args = jsonData['data']['password']

self.pwdNN = args[0]
self.pwdEE = args[1]

if self.seq == '':
url = 'http://{}/cgi-bin/luci/;stok=/login?form=auth' \
.format(self.host)

# If possible implement RSA encryption of password here.
response = requests.post(
url, params={'operation': 'read'},
headers={REFERER: referer}, timeout=4)

jsonData = response.json()

if jsonData['success'] == False:
raise Exception('Unkown error: ' + jsonData)

self.seq = jsonData['data']['seq']
args = jsonData['data']['key']

self.nn = args[0]
self.ee = args[1]

url = 'http://{}/cgi-bin/luci/;stok=/login?form=login' \
.format(self.host)

cryptedPwd = self.encryption.RSAEncrypt(self.password, self.pwdNN, self.pwdEE)
data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

body = self.prepare_data(data)

response = requests.post(
url, data=body,
headers={REFERER: referer, 'Content-Type': 'application/x-www-form-urlencoded'}, timeout=4)

try:
jsonData = response.json()

encryptedResponseData = jsonData['data']
responseData = self.encryption.AESDecrypt(encryptedResponseData)

responseDict = json.loads(responseData)

if responseDict['success'] == False:
raise Exception(responseDict['errorcode'])

self.stok = responseDict['data']['stok']
#_LOGGER.info(self.stok)
regex_result = re.search(
'sysauth=(.*);', response.headers['set-cookie'])
self.sysauth = regex_result.group(1)
#_LOGGER.info(self.sysauth)
self.login = False
return True
except (ValueError, KeyError, AttributeError) as _:
_LOGGER.error("Couldn't fetch auth tokens! Response was: %s",
response.text)
return False

def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
_LOGGER.info("[C6] Loading wireless clients...")

if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens()

url = ('http://{}/cgi-bin/luci/;stok={}/admin/wireless?'
'form=statistics').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)

response = requests.post(
url, params={'operation': 'load'}, headers={REFERER: referer},
cookies={'sysauth': self.sysauth}, timeout=5)

try:
json_response = response.json()

data = json_response['data']
data = self.encryption.AESDecrypt(data)

json_response = json.loads(data)

if json_response['success']:
result = json_response['data']
else:
if json_response['errorcode'] == 'timeout':
_LOGGER.info("Token timed out. Relogging on next scan")
self.stok = ''
self.sysauth = ''
return False
_LOGGER.error(
"An unknown error happened while fetching data")
return False
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct")
return False

if result:
self.last_results = {
device['mac'].replace('-', ':'): device['mac']
for device in result
}
return True

return False

def _log_out(self):
name = 'admin'
pwd = self.password
self.hash = hashlib.md5((name + pwd).encode()).hexdigest()
_LOGGER.info("Logging out of router admin interface...")

url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.1596185370610.html'.format(self.host)

body = self.prepare_data('operation=write')
response = requests.post(
url, data=body, headers={REFERER: referer, 'Content-Type': 'application/x-www-form-urlencoded'},
cookies={'sysauth': self.sysauth})

self.stok = ''
self.sysauth = ''
self.login = True

class EncryptionWrapper():
def __init__(self):
self.iv = binascii.b2a_hex(Random.get_random_bytes(8))
self.key = binascii.b2a_hex(Random.get_random_bytes(8))

def AESEncrypt(self, raw):
raw = self._pad(raw)
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
return base64.b64encode(cipher.encrypt(raw.encode())).decode()

def AESDecrypt(self, enc):
enc = base64.b64decode(enc)
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
decrypted = cipher.decrypt(enc)
result = self._unpad(decrypted)
return result.decode()

def _pad(self, s):
return s + (AES.block_size - len(s) % AES.block_size) * chr(AES.block_size - len(s) % AES.block_size)

@staticmethod
def _unpad(s):
return s[:-ord(s[len(s)-1:])]

def getAESString(self):
return 'k={}&i={}'.format(self.key.decode(), self.iv.decode())

def RSAEncrypt(self, data, nn, ee):
e = int(ee, 16)
n = int(nn, 16)

key = construct((n, e))
cipher = PKCS1_v1_5.new(key)

result = cipher.encrypt(data.encode())
return binascii.b2a_hex(result).decode()

def getSignature(self, seq, isLogin, hash, nn, ee):
s = ''

if isLogin:
s = '{}&h={}&s={}'.format(self.getAESString(), hash, seq)
else:
s = 'h={}&s={}'.format(hash, seq)

sign = ''
pos = 0

while pos < len(s):
sign = sign + self.RSAEncrypt(s[pos:pos+53], nn, ee)
pos = pos + 53

return sign