Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tested TP-link C1200 v2 #9

Closed
swwgames opened this issue Feb 7, 2024 · 11 comments
Closed

Tested TP-link C1200 v2 #9

swwgames opened this issue Feb 7, 2024 · 11 comments

Comments

@swwgames
Copy link
Contributor

swwgames commented Feb 7, 2024

Issue Description:
I tested the TP-Link C1200 v2, and it does not work with the existing code. After troubleshooting, I discovered that this router requires modifications to the client.py code. Specifically, the self._seq and self._pwdNN parts of the code are not needed. To retrieve data, only the stok and sysauth tokens are required.

Changes Made:
I've made the necessary modifications to the client.py code.

import hashlib
import re
from collections.abc import Callable
import json
import requests
import macaddress
import ipaddress
from logging import Logger
from tplinkrouterc6u.encryption import EncryptionWrapper
from tplinkrouterc6u.enum import Wifi
from tplinkrouterc6u.dataclass import Firmware, Status, Device, IPv4Reservation, IPv4DHCPLease, IPv4Status


class TplinkRouter:
    def __init__(self, host: str, password: str, username: str = 'admin', logger: Logger = None,
                 verify_ssl: bool = True, timeout: int = 10) -> None:
        self.host = host
        if not (self.host.startswith('http://') or self.host.startswith('https://')):
            self.host = "http://{}".format(self.host)
        self._verify_ssl = verify_ssl
        if self._verify_ssl is False:
            requests.packages.urllib3.disable_warnings()
        self.username = username
        self.password = password
        self.timeout = timeout
        self.single_request_mode = True
        self._logger = logger

        self._stok = ''
        self._sysauth = ''

        self._logged = False
        self._seq = ''
        self._hash = hashlib.md5((self.username + self.password).encode()).hexdigest()

        self.nn = ''
        self.ee = ''

        self._pwdNN = ''
        self._pwdEE = ''

        self._encryption = EncryptionWrapper()

    def get_firmware(self) -> Firmware | None:
        return self._request(self._get_firmware)

    def get_status(self) -> Status | None:
        return self._request(self._get_status)

    def get_ipv4_status(self) -> IPv4Status | None:
        return self._request(self._get_ipv4_status)

    def get_ipv4_reservations(self) -> [IPv4Reservation]:
        return self._request(self._get_ipv4_reservations)

    def get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        return self._request(self._get_ipv4_dhcp_leases)

    def query(self, query, operation='operation=read'):
        def callback():
            return self._get_data(query, operation)

        return self._request(callback)

    def get_full_info(self) -> tuple[Firmware, Status] | None:
        def callback():
            firmware = self._get_firmware()
            status = self._get_status()

            return firmware, status

        return self._request(callback)

    def set_wifi(self, wifi: Wifi, enable: bool) -> None:
        def callback():
            path = f"admin/wireless?&form=guest&form={wifi.value}"
            data = f"operation=write&{wifi.value}_enable={'on' if enable else 'off'}"
            self._send_data(path, data)

        self._request(callback)

    def reboot(self) -> None:
        def callback():
            self._send_data('admin/system?form=reboot', 'operation=write')

        self._request(callback)

    def authorize(self) -> bool:
        referer = '{}/webpages/login.html?t=1596185370610'.format(self.host)

        #if self._pwdNN == '':
        #    self._request_pwd(referer)

        #if self._seq == '':
        #    self._request_seq(referer)

        response = self._try_login(referer)

        #if 'text/plain' in response.headers.get('Content-Type'):
        #    self._request_pwd(referer)
        #    self._request_seq(referer)
        #    response = self._try_login(referer)

        try:
            #jsonData = response.json()

            #if 'data' not in jsonData or not jsonData['data']:
            #    raise Exception('No data in response: ' + response.text)

            #encryptedResponseData = jsonData['data']
            #responseData = self._encryption.aes_decrypt(encryptedResponseData)

            #responseDict = json.loads(responseData)

            #if 'success' not in responseDict or not responseDict['success']:
            #    raise Exception('No data in response: ' + responseData)

            self._stok = response.json().get('data').get('stok')
            regex_result = re.search('sysauth=(.*);', response.headers['set-cookie'])
            self._sysauth = regex_result.group(1)
            self._logged = True
            return True
        except (ValueError, KeyError, AttributeError) as e:
            if self._logger:
                self._logger.error("TplinkRouter Integration Exception - Couldn't fetch auth tokens! Response was: %s",
                                   response.text)

        return False

    def logout(self) -> None:
        if self._logged:
            self._send_data('admin/system?form=logout', 'operation=write')
        self.clear()

    def clear(self) -> None:
        self._stok = ''
        self._sysauth = ''
        self._logged = False

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade', 'operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

    def _get_status(self) -> Status:

        def _calc_cpu_usage(data: dict) -> float | None:
            cpu_usage = (data.get('cpu_usage', 0) + data.get('cpu1_usage', 0)
                         + data.get('cpu2_usage', 0) + data.get('cpu3_usage', 0))
            return cpu_usage / 4 if cpu_usage != 0 else None

        data = self._get_data('admin/status?form=all', 'operation=read')
        status = Status()
        status.devices = []
        status._wan_macaddr = macaddress.EUI48(data['wan_macaddr']) if 'wan_macaddr' in data else None
        status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        status._wan_ipv4_addr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr']) if 'wan_ipv4_ipaddr' in data else None
        status._lan_ipv4_addr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr']) if 'lan_ipv4_ipaddr' in data else None
        status._wan_ipv4_gateway = ipaddress.IPv4Address(
            data['wan_ipv4_gateway']) if 'wan_ipv4_gateway' in data else None
        status.wan_ipv4_uptime = data.get('wan_ipv4_uptime')
        status.mem_usage = data.get('mem_usage')
        status.cpu_usage = _calc_cpu_usage(data)
        status.wired_total = len(data.get('access_devices_wired', []))
        status.wifi_clients_total = len(data.get('access_devices_wireless_host', []))
        status.guest_clients_total = len(data.get('access_devices_wireless_guest', []))
        status.clients_total = status.wired_total + status.wifi_clients_total + status.guest_clients_total
        status.guest_2g_enable = data.get('guest_2g_enable') == 'on'
        status.guest_5g_enable = data.get('guest_5g_enable') == 'on'
        status.iot_2g_enable = data.get('iot_2g_enable') == 'on' if data.get('iot_2g_enable') is not None else None
        status.iot_5g_enable = data.get('iot_5g_enable') == 'on' if data.get('iot_5g_enable') is not None else None
        status.wifi_2g_enable = data.get('wireless_2g_enable') == 'on'
        status.wifi_5g_enable = data.get('wireless_5g_enable') == 'on'

        for item in data.get('access_devices_wireless_host', []):
            type = Wifi.WIFI_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        for item in data.get('access_devices_wireless_guest', []):
            type = Wifi.WIFI_GUEST_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_GUEST_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        return status

    def _get_ipv4_status(self) -> IPv4Status:
        ipv4_status = IPv4Status()
        data = self._get_data('admin/network?form=status_ipv4', 'operation=read')
        ipv4_status._wan_macaddr = macaddress.EUI48(data['wan_macaddr'])
        ipv4_status._wan_ipv4_ipaddr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr'])
        ipv4_status._wan_ipv4_gateway = ipaddress.IPv4Address(data['wan_ipv4_gateway'])
        ipv4_status.wan_ipv4_conntype = data['wan_ipv4_conntype']
        ipv4_status._wan_ipv4_netmask = ipaddress.IPv4Address(data['wan_ipv4_netmask'])
        ipv4_status._wan_ipv4_pridns = ipaddress.IPv4Address(data['wan_ipv4_pridns'])
        ipv4_status._wan_ipv4_snddns = ipaddress.IPv4Address(data['wan_ipv4_snddns'])
        ipv4_status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        ipv4_status._lan_ipv4_ipaddr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr'])
        ipv4_status.lan_ipv4_dhcp_enable = self._str2bool(data['lan_ipv4_dhcp_enable'])
        ipv4_status._lan_ipv4_netmask = ipaddress.IPv4Address(data['lan_ipv4_netmask'])
        ipv4_status.remote = self._str2bool(data['remote'])

        return ipv4_status

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation', 'operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client', 'operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

    def _query(self, query, operation):
        data = self._get_data(query, operation)

        # for item in data:
        #    dhcp_leases.append(IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'], item['leasetime']))

        return data

    # TODO
    #        data2 = self._get_data('admin/dhcps?form=setting', 'operation=read')

    def _str2bool(self, v):
        return str(v).lower() in ("yes", "true", "on")

    def _request_pwd(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=keys'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url, params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            args = data['data']['password']

            self._pwdNN = args[0]
            self._pwdEE = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No pwd response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for pwd - {}; Response - {}'.format(error, response.text))

    def _request_seq(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=auth'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url,
            params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            self._seq = data['data']['seq']
            args = data['data']['key']

            self.nn = args[0]
            self.ee = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No seq response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for seq - {}; Response - {}'.format(error, response.text))

    def _try_login(self, referer: str) -> requests.Response:
        url = '{}/cgi-bin/luci/;stok=/login?form=login'.format(self.host)

        #cryptedPwd = self._encryption.rsa_encrypt(self.password, self._pwdNN, self._pwdEE)
        #data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

        #body = self._prepare_data(data)

        return requests.post(
            url,
            params={'operation': 'login', 'username': 'admin', 'password': 'add password here'},
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            timeout=self.timeout,
            #verify=self._verify_ssl,
        )

    def _prepare_data(self, data) -> dict:
        encrypted_data = self._encryption.aes_encrypt(data)
        data_len = len(encrypted_data)

        sign = self._encryption.get_signature(int(self._seq) + data_len, self._logged == False, self._hash, self.nn,
                                              self.ee)

        return {'sign': sign, 'data': encrypted_data}

    def _request(self, callback: Callable):
        if not self.single_request_mode:
            return callback()

        try:
            if self.authorize():
                data = callback()
                self.logout()
                return data
        except Exception as error:
            self._seq = ''
            self._pwdNN = ''
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - {}'.format(error))
        finally:
            self.clear()

    def _get_data(self, path: str, data: str = 'operation=read') -> dict | None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.html'.format(self.host)

        response = requests.post(
            url,
            data=data,
            headers={'Referer': referer},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        data = response.text
        print(data)
        try:
            json_response = response.json()
            if 'data' not in json_response:
                raise Exception("Router didn't respond with JSON - " + data)
            #data = self._encryption.aes_decrypt(json_response['data'])

            json_response = json.loads(data)

            if 'success' in json_response and json_response['success']:
                return json_response['data']
            else:
                if 'errorcode' in json_response and json_response['errorcode'] == 'timeout':
                    if self._logger:
                        self._logger.info(
                            "TplinkRouter Integration Exception - Token timed out. Relogging on next scan")
                    self._stok = ''
                    self._sysauth = ''
                elif self._logger:
                    self._logger.error(
                        "TplinkRouter Integration Exception - An unknown error happened while fetching data %s", data)
        except ValueError:
            if self._logger:
                self._logger.error(
                    "TplinkRouter Integration Exception - Router didn't respond with JSON. Check if credentials are correct")

        raise Exception('An unknown response - ' + data)

    def _send_data(self, path: str, data: str) -> None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.1596185370610.html'.format(self.host)

        body = data
        requests.post(
            url,
            data=body,
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

this is some other code i used for testing:

import re

# Settings for TPLink
url = 'http://192.168.0.1/cgi-bin/luci/;stok=/login?form=login'
urlbegin = 'http://192.168.0.1/cgi-bin/luci/;stok='
urlstateind = '/admin/wireless?form=statistics'
urlloeind = '/admin/system?form=logout'

referer = 'http://192.168.0.1/webpages/login.html'

# Credentials TP link
admin = 'admin'
# Encrypted password see https://www.home-assistant.io/components/device_tracker.tplink/
pwd = ''


# Retrieve auth tokens TPlink
stok = ''
sysauth = ''

# If possible, implement RSA encryption of password here.
response = requests.post(url, params={'operation': 'login',
                                      'username': admin, 'password': pwd},
                         headers={'Referer': referer}, timeout=4)

try:
    stok = response.json().get('data').get('stok')
    regex_result = re.search('sysauth=(.*);', response.headers['set-cookie'])
    sysauth = regex_result.group(1)

    urlstat = urlbegin + stok + urlstateind
    response = requests.post(urlstat, params={'operation': 'load'},
                             headers={'Referer': referer},
                             cookies={'sysauth': sysauth}, timeout=5)

    responsstat = re.findall(r'mac":".................', response.text)
    mac_addresses = [mac_address.replace('mac":"', '') for mac_address in responsstat]

    print(mac_addresses)
    goon = 1

    urllo = urlbegin + stok + urlloeind
    response = requests.post(urllo, params={'operation': 'write'},
                              headers={'Referer': referer},
                              cookies={'sysauth': sysauth}, timeout=5)

    goon = 1
except (ValueError, KeyError) as _:
    print("Couldn't fetch auth tokens! Response was: %s", response.text)
    goon = 0

if goon == 0:
    print("quit")
    quit()
@swwgames
Copy link
Contributor Author

swwgames commented Feb 7, 2024

I forgot this part:
The challenge I'm facing is that I'm unsure about how to implement a fix for this issue. I'm looking forward to a response from the community for guidance or assistance in resolving this.

If you have any questions or need further clarification, feel free to ask.

@AlexandrErohin
Copy link
Owner

AlexandrErohin commented Feb 8, 2024

Hi!
Great that you have succeed with your router
Does methods get_firmware, get_status, set_wifi, reboot, work for your router after authorization changes?
Could you also try to run test.py? do all checks pass?

@swwgames
Copy link
Contributor Author

swwgames commented Feb 8, 2024

Thank you for your response. I have tested the methods you mentioned and here are my findings:

I started testing with get_status, that function works like it should.

Then i tested get_firmware function, this function returend an error unknown response.
After some debuging i found that the request url was http://192.168.0.1/cgi-bin/luci/;stok=stok/admin/firmware?form=upgrade.
This gave an error because the needed operation=read at the end. like this: http://192.168.0.1/cgi-bin/luci/;stok=stok/admin/firmware?form=upgrade&operation=read.
Now the code responds with the right data.
To make this change I adjusted this line in the client.py file:

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade', 'operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

To this:

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade&operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

I also tested the reboot function, that works as expected, just as set wifi, that also works as expected.

I also tested other functions and made necessary adjustments:

get_ipv4_status
this function does get the data (i added a line to print the data imidiatly after the response for debugging)
but ends in an error with key word remote. After checking the data this function returns, I found that it doesn't contain the remote data.
so i commanded it out, and now it doesn't return an error.

get_ipv4_reservations
this function has the same problem as get_firmware the url doesn't contain the &operation=load.
So I changed this:

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation', 'operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

To this:

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation&operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

function get_ipv4_dhcp_leases has the same problem.
I changed this:

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client', 'operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

To this:

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client&operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

The query function works as expected.

The get_full_info function also works as expected.

Here is the complete edited client.py file:

import hashlib
import re
from collections.abc import Callable
import json
import requests
import macaddress
import ipaddress
from logging import Logger
from tplinkrouterc6u.encryption import EncryptionWrapper
from tplinkrouterc6u.enum import Wifi
from tplinkrouterc6u.dataclass import Firmware, Status, Device, IPv4Reservation, IPv4DHCPLease, IPv4Status


class TplinkRouter:
    def __init__(self, host: str, password: str, username: str = 'admin', logger: Logger = None,
                 verify_ssl: bool = True, timeout: int = 10) -> None:
        self.host = host
        if not (self.host.startswith('http://') or self.host.startswith('https://')):
            self.host = "http://{}".format(self.host)
        self._verify_ssl = verify_ssl
        if self._verify_ssl is False:
            requests.packages.urllib3.disable_warnings()
        self.username = username
        self.password = password
        self.timeout = timeout
        self.single_request_mode = True
        self._logger = logger

        self._stok = ''
        self._sysauth = ''

        self._logged = False
        self._seq = ''
        self._hash = hashlib.md5((self.username + self.password).encode()).hexdigest()

        self.nn = ''
        self.ee = ''

        self._pwdNN = ''
        self._pwdEE = ''

        self._encryption = EncryptionWrapper()

    def get_firmware(self) -> Firmware | None:
        return self._request(self._get_firmware)

    def get_status(self) -> Status | None:
        return self._request(self._get_status)

    def get_ipv4_status(self) -> IPv4Status | None:
        return self._request(self._get_ipv4_status)

    def get_ipv4_reservations(self) -> [IPv4Reservation]:
        return self._request(self._get_ipv4_reservations)

    def get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        return self._request(self._get_ipv4_dhcp_leases)

    def query(self, query, operation='operation=read'):
        def callback():
            return self._get_data(query, operation)

        return self._request(callback)

    def get_full_info(self) -> tuple[Firmware, Status] | None:
        def callback():
            firmware = self._get_firmware()
            status = self._get_status()

            return firmware, status

        return self._request(callback)

    def set_wifi(self, wifi: Wifi, enable: bool) -> None:
        def callback():
            path = f"admin/wireless?&form=guest&form={wifi.value}"
            data = f"operation=write&{wifi.value}_enable={'on' if enable else 'off'}"
            self._send_data(path, data)

        self._request(callback)

    def reboot(self) -> None:
        def callback():
            self._send_data('admin/system?form=reboot', 'operation=write')

        self._request(callback)

    def authorize(self) -> bool:
        referer = '{}/webpages/login.html?t=1596185370610'.format(self.host)

        #if self._pwdNN == '':
        #    self._request_pwd(referer)

        #if self._seq == '':
        #    self._request_seq(referer)

        response = self._try_login(referer)

        #if 'text/plain' in response.headers.get('Content-Type'):
        #    self._request_pwd(referer)
        #    self._request_seq(referer)
        #    response = self._try_login(referer)

        try:
            #jsonData = response.json()

            #if 'data' not in jsonData or not jsonData['data']:
            #    raise Exception('No data in response: ' + response.text)

            #encryptedResponseData = jsonData['data']
            #responseData = self._encryption.aes_decrypt(encryptedResponseData)

            #responseDict = json.loads(responseData)

            #if 'success' not in responseDict or not responseDict['success']:
            #    raise Exception('No data in response: ' + responseData)

            self._stok = response.json().get('data').get('stok')
            regex_result = re.search('sysauth=(.*);', response.headers['set-cookie'])
            self._sysauth = regex_result.group(1)
            self._logged = True
            return True
        except (ValueError, KeyError, AttributeError) as e:
            if self._logger:
                self._logger.error("TplinkRouter Integration Exception - Couldn't fetch auth tokens! Response was: %s",
                                   response.text)

        return False

    def logout(self) -> None:
        if self._logged:
            self._send_data('admin/system?form=logout', 'operation=write')
        self.clear()

    def clear(self) -> None:
        self._stok = ''
        self._sysauth = ''
        self._logged = False

    def _get_firmware(self) -> Firmware:
        data = self._get_data('admin/firmware?form=upgrade&operation=read')
        firmware = Firmware(data.get('hardware_version', ''), data.get('model', ''), data.get('firmware_version', ''))

        return firmware

    def _get_status(self) -> Status:

        def _calc_cpu_usage(data: dict) -> float | None:
            cpu_usage = (data.get('cpu_usage', 0) + data.get('cpu1_usage', 0)
                         + data.get('cpu2_usage', 0) + data.get('cpu3_usage', 0))
            return cpu_usage / 4 if cpu_usage != 0 else None

        data = self._get_data('admin/status?form=all', 'operation=read')
        status = Status()
        status.devices = []
        status._wan_macaddr = macaddress.EUI48(data['wan_macaddr']) if 'wan_macaddr' in data else None
        status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        status._wan_ipv4_addr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr']) if 'wan_ipv4_ipaddr' in data else None
        status._lan_ipv4_addr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr']) if 'lan_ipv4_ipaddr' in data else None
        status._wan_ipv4_gateway = ipaddress.IPv4Address(
            data['wan_ipv4_gateway']) if 'wan_ipv4_gateway' in data else None
        status.wan_ipv4_uptime = data.get('wan_ipv4_uptime')
        status.mem_usage = data.get('mem_usage')
        status.cpu_usage = _calc_cpu_usage(data)
        status.wired_total = len(data.get('access_devices_wired', []))
        status.wifi_clients_total = len(data.get('access_devices_wireless_host', []))
        status.guest_clients_total = len(data.get('access_devices_wireless_guest', []))
        status.clients_total = status.wired_total + status.wifi_clients_total + status.guest_clients_total
        status.guest_2g_enable = data.get('guest_2g_enable') == 'on'
        status.guest_5g_enable = data.get('guest_5g_enable') == 'on'
        status.iot_2g_enable = data.get('iot_2g_enable') == 'on' if data.get('iot_2g_enable') is not None else None
        status.iot_5g_enable = data.get('iot_5g_enable') == 'on' if data.get('iot_5g_enable') is not None else None
        status.wifi_2g_enable = data.get('wireless_2g_enable') == 'on'
        status.wifi_5g_enable = data.get('wireless_5g_enable') == 'on'

        for item in data.get('access_devices_wireless_host', []):
            type = Wifi.WIFI_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        for item in data.get('access_devices_wireless_guest', []):
            type = Wifi.WIFI_GUEST_2G if '2.4G' == item['wire_type'] else Wifi.WIFI_GUEST_5G
            status.devices.append(Device(type, macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']),
                                         item['hostname']))

        return status

    def _get_ipv4_status(self) -> IPv4Status:
        ipv4_status = IPv4Status()
        data = self._get_data('admin/network?form=status_ipv4', 'operation=read')
        ipv4_status._wan_macaddr = macaddress.EUI48(data['wan_macaddr'])
        ipv4_status._wan_ipv4_ipaddr = ipaddress.IPv4Address(data['wan_ipv4_ipaddr'])
        ipv4_status._wan_ipv4_gateway = ipaddress.IPv4Address(data['wan_ipv4_gateway'])
        ipv4_status.wan_ipv4_conntype = data['wan_ipv4_conntype']
        ipv4_status._wan_ipv4_netmask = ipaddress.IPv4Address(data['wan_ipv4_netmask'])
        ipv4_status._wan_ipv4_pridns = ipaddress.IPv4Address(data['wan_ipv4_pridns'])
        ipv4_status._wan_ipv4_snddns = ipaddress.IPv4Address(data['wan_ipv4_snddns'])
        ipv4_status._lan_macaddr = macaddress.EUI48(data['lan_macaddr'])
        ipv4_status._lan_ipv4_ipaddr = ipaddress.IPv4Address(data['lan_ipv4_ipaddr'])
        ipv4_status.lan_ipv4_dhcp_enable = self._str2bool(data['lan_ipv4_dhcp_enable'])
        ipv4_status._lan_ipv4_netmask = ipaddress.IPv4Address(data['lan_ipv4_netmask'])
        #ipv4_status.remote = self._str2bool(data['remote'])

        return ipv4_status

    def _get_ipv4_reservations(self) -> [IPv4Reservation]:
        ipv4_reservations = []
        data = self._get_data('admin/dhcps?form=reservation&operation=load')

        for item in data:
            ipv4_reservations.append(
                IPv4Reservation(macaddress.EUI48(item['mac']), ipaddress.IPv4Address(item['ip']), item['comment'],
                                self._str2bool(item['enable'])))

        return ipv4_reservations

    def _get_ipv4_dhcp_leases(self) -> [IPv4DHCPLease]:
        dhcp_leases = []
        data = self._get_data('admin/dhcps?form=client&operation=load')

        for item in data:
            dhcp_leases.append(
                IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'],
                              item['leasetime']))

        return dhcp_leases

    def _query(self, query, operation):
        data = self._get_data(query, operation)

        # for item in data:
        #    dhcp_leases.append(IPv4DHCPLease(macaddress.EUI48(item['macaddr']), ipaddress.IPv4Address(item['ipaddr']), item['name'], item['leasetime']))

        return data

    # TODO
    #        data2 = self._get_data('admin/dhcps?form=setting', 'operation=read')

    def _str2bool(self, v):
        return str(v).lower() in ("yes", "true", "on")

    def _request_pwd(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=keys'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url, params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            args = data['data']['password']

            self._pwdNN = args[0]
            self._pwdEE = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No pwd response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for pwd - {}; Response - {}'.format(error, response.text))

    def _request_seq(self, referer: str) -> None:
        url = '{}/cgi-bin/luci/;stok=/login?form=auth'.format(self.host)

        # If possible implement RSA encryption of password here.
        response = requests.post(
            url,
            params={'operation': 'read'},
            headers={'Referer': referer},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        try:
            data = response.json()

            self._seq = data['data']['seq']
            args = data['data']['key']

            self.nn = args[0]
            self.ee = args[1]
        except json.decoder.JSONDecodeError:
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - No seq response - {}'.format(response.text))
            raise Exception('Unsupported router!')
        except Exception as error:
            raise Exception('Unknown error for seq - {}; Response - {}'.format(error, response.text))

    def _try_login(self, referer: str) -> requests.Response:
        url = '{}/cgi-bin/luci/;stok=/login?form=login'.format(self.host)

        #cryptedPwd = self._encryption.rsa_encrypt(self.password, self._pwdNN, self._pwdEE)
        #data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

        #body = self._prepare_data(data)

        return requests.post(
            url,
            params={'operation': 'login', 'username': 'admin', 'password': password
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            timeout=self.timeout,
            #verify=self._verify_ssl,
        )

    def _prepare_data(self, data) -> dict:
        encrypted_data = self._encryption.aes_encrypt(data)
        data_len = len(encrypted_data)

        sign = self._encryption.get_signature(int(self._seq) + data_len, self._logged == False, self._hash, self.nn,
                                              self.ee)

        return {'sign': sign, 'data': encrypted_data}

    def _request(self, callback: Callable):
        if not self.single_request_mode:
            return callback()

        try:
            if self.authorize():
                data = callback()
                self.logout()
                return data
        except Exception as error:
            self._seq = ''
            self._pwdNN = ''
            if self._logger:
                self._logger.error('TplinkRouter Integration Exception - {}'.format(error))
        finally:
            self.clear()

    def _get_data(self, path: str, data: str = 'operation=read') -> dict | None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.html'.format(self.host)

        response = requests.post(
            url,
            data=data,
            headers={'Referer': referer},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

        data = response.text
        print(data)
        try:
            json_response = response.json()
            if 'data' not in json_response:
                raise Exception("Router didn't respond with JSON - " + data)
            #data = self._encryption.aes_decrypt(json_response['data'])

            json_response = json.loads(data)

            if 'success' in json_response and json_response['success']:
                return json_response['data']
            else:
                if 'errorcode' in json_response and json_response['errorcode'] == 'timeout':
                    if self._logger:
                        self._logger.info(
                            "TplinkRouter Integration Exception - Token timed out. Relogging on next scan")
                    self._stok = ''
                    self._sysauth = ''
                elif self._logger:
                    self._logger.error(
                        "TplinkRouter Integration Exception - An unknown error happened while fetching data %s", data)
        except ValueError:
            if self._logger:
                self._logger.error(
                    "TplinkRouter Integration Exception - Router didn't respond with JSON. Check if credentials are correct")

        raise Exception('An unknown response - ' + data)

    def _send_data(self, path: str, data: str) -> None:
        if self._logged is False:
            raise Exception('Not authorised')
        url = '{}/cgi-bin/luci/;stok={}/{}'.format(self.host, self._stok, path)
        referer = '{}/webpages/index.1596185370610.html'.format(self.host)

        body = data
        requests.post(
            url,
            data=body,
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            cookies={'sysauth': self._sysauth},
            timeout=self.timeout,
            verify=self._verify_ssl,
        )

Another thing to note is that i changed the _try_login function.
I commanded out a few lines that are not needed for the login to this router.
And instead of the password being the normal password it is i think a hashed password.
To get this password I logged in via the webinterface with developers tools open on the network tab.
And i got the password located in the payload off login?form=login

    def _try_login(self, referer: str) -> requests.Response:
        url = '{}/cgi-bin/luci/;stok=/login?form=login'.format(self.host)

        #cryptedPwd = self._encryption.rsa_encrypt(self.password, self._pwdNN, self._pwdEE)
        #data = 'operation=login&password={}&confirm=true'.format(cryptedPwd)

        #body = self._prepare_data(data)

        return requests.post(
            url,
            params={'operation': 'login', 'username': 'admin', 'password': 'web password here'},
            headers={'Referer': referer, 'Content-Type': 'application/x-www-form-urlencoded'},
            timeout=self.timeout,
            #verify=self._verify_ssl,
        )

@AlexandrErohin
Copy link
Owner

AlexandrErohin commented Feb 8, 2024

Wow. Thank you very much for your work!
It would be great to see you as a contributor to this repo! Could you make a PR with adding your edited code as a new class TplinkC1200Router to the end of file client.py ?
Then I try to combine them

@swwgames
Copy link
Contributor Author

swwgames commented Feb 8, 2024

I made the pull request, I hope you can merge it. If you find any problems in my code please let me know.

I can be a contributer, but I don't now how active I will be. But i will always be open for testing.

@AlexandrErohin
Copy link
Owner

@swwgames Thank you very much! I have merged you PR and created TplinkRouterProvider which automatically get you the right client
Could you download my updates and run test.py again?
Would be great if you review my changes and let me know if find something to fix or change

@swwgames
Copy link
Contributor Author

swwgames commented Feb 12, 2024

I tested the new code with test.py I found that the login part worked great.
I also checked the output of the script, I missed some data, after some testing i figured out that the query send to the router is not correct. the "&operation=read" or "&operation=load" part is not part of the url. i modified the test.py file like this:

with open('C:/Users/Sam/Documents/Code/Tplink/TP-Link-Archer-C6U-main/queries.txt') as queries:
    for query in queries:
        query = query.strip()
        if query.startswith('#'):
            continue
        query = '{}&operation=read'.format(query)
        try:
            data = router.query(query)
            print(query)
            tokens = query.split('?')
            folder = "logs" + os.sep + tokens[0]
            Path(folder).mkdir(parents=True, exist_ok=True)
            with open(folder + os.sep + f"{tokens[1]}.log", "w") as log_file:
                pp = pprint.PrettyPrinter(indent=4, stream=log_file)
                pp.pprint(data)
        except Exception as ex:
            print(f"{query} exception {ex}")
            router = TplinkRouter('http://192.168.0.1', password, timeout=10)
        finally:
            pass

Also this router has no username, so that part of the code is not neccesary.

I also tested my router in AP mode (wireless accespoint) there are some differences in the data they give, but nothing major.

I hope I provided you with the right data, let me know if you need something from me.
TplinkC1200Test.zip

@AlexandrErohin
Copy link
Owner

Thank you very much for help!

@AlexandrErohin
Copy link
Owner

@swwgames Could you download my updates and run test.py again? I have simplified the client

@swwgames
Copy link
Contributor Author

swwgames commented Feb 15, 2024

I tested your update, and it works. The only thing is that the test file doesn't add the operation mode so some data is missing.
logs.zip

@AlexandrErohin
Copy link
Owner

@swwgames Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants