diff --git a/src/batcontrol/inverter/fronius.py b/src/batcontrol/inverter/fronius.py index b8741485..2dcf8430 100644 --- a/src/batcontrol/inverter/fronius.py +++ b/src/batcontrol/inverter/fronius.py @@ -33,11 +33,20 @@ logger_auth = logging.getLogger("batcontrol.inverter.fronius.auth") -def hash_utf8(x): - """Hash a string or bytes object.""" +def hash_utf8(x, algorithm="MD5"): + """Hash a string or bytes object. + + Args: + x: String or bytes to hash + algorithm: Hash algorithm to use ("MD5" or "SHA256") + """ if isinstance(x, str): x = x.encode("utf-8") - return hashlib.md5(x).hexdigest() + + if algorithm.upper() == "SHA256": + return hashlib.sha256(x).hexdigest() + else: # Default to MD5 for backward compatibility + return hashlib.md5(x).hexdigest() def strip_dict(original): @@ -76,6 +85,7 @@ class FroniusApiConfig: config_timeofuse_path: str commands_login_path: str commands_logout_path: str + auth_algorithm: str = "SHA256" # Authentication algorithm: "MD5" or "SHA256" # Alle Konfigurationen in einer Liste @@ -92,6 +102,7 @@ class FroniusApiConfig: config_timeofuse_path='/config/timeofuse', commands_login_path='/commands/Login', commands_logout_path='/commands/Logout', + auth_algorithm="MD5", ), FroniusApiConfig( from_version=version.parse("1.28.7-1"), @@ -105,9 +116,24 @@ class FroniusApiConfig: config_timeofuse_path='/config/timeofuse', commands_login_path='/commands/Login', commands_logout_path='/commands/Logout', + auth_algorithm="MD5", ), FroniusApiConfig( from_version=version.parse("1.36"), + to_version=version.parse("1.38.6-1"), + version_path='/api/status/version', + powerflow_path='/solar_api/v1/GetPowerFlowRealtimeData.fcgi', + storage_path='/solar_api/v1/GetStorageRealtimeData.cgi', + config_battery_path='/api/config/batteries', + config_powerunit_path='/api/config/powerunit', + config_solar_api_path='/api/config/solar_api', + config_timeofuse_path='/api/config/timeofuse', + commands_login_path='/api/commands/Login', + commands_logout_path='/api/commands/Logout', + auth_algorithm="MD5", + ), + FroniusApiConfig( + from_version=version.parse("1.38.6-1"), to_version=version.parse("9999.99.99"), version_path='/api/status/version', powerflow_path='/solar_api/v1/GetPowerFlowRealtimeData.fcgi', @@ -118,6 +144,7 @@ class FroniusApiConfig: config_timeofuse_path='/api/config/timeofuse', commands_login_path='/api/commands/Login', commands_logout_path='/api/commands/Logout', + auth_algorithm="SHA256", ), ] @@ -136,9 +163,21 @@ class FroniusWR(InverterBaseclass): def __init__(self, config: dict) -> None: super().__init__(config) + + # We are doing three login tests during first login. + # As MD5 was the default on the old firmware, the latest + # retries should be MD5. + self.usable_password_hash_methods = [ + "SHA256", # First try: SHA256 + "MD5", # Second try: MD5 + "MD5" # Third try: MD5 again (retry with same method) + ] + self._last_password_hash_method_index = -1 + self.password_hash = None + self.subsequent_login = False self.ncvalue_num = 1 - self.cnonce = "NaN" + self.cnonce = hashlib.md5(os.urandom(8)).hexdigest() self.login_attempts = 0 self.address = config['address'] self.capacity = -1 @@ -198,7 +237,7 @@ def __init__(self, config: dict) -> None: self.previous_battery_config['HYB_BACKUP_RESERVED'] ) self.max_soc = self.previous_battery_config['BAT_M0_SOC_MAX'] - self.backup_time_of_use() # save timesofuse + self.backup_time_of_use() # save timesofuse self.set_allow_grid_charging(True) def get_firmware_version(self) -> version: @@ -653,6 +692,8 @@ def __send_one_http_request(self, path, method='GET', payload="", if auth: headers['Authorization'] = self.get_auth_header( method=method, path=fullpath) + logger_auth.debug("Requesting %s , header: %s", + fullpath, headers) for i in range(3): # 3 retries if connection can't be established @@ -687,13 +728,15 @@ def login(self): """Login to Fronius API""" logger_auth.debug("Logging in") path = self.api_config.commands_login_path - self.cnonce = "NaN" + self.cnonce = hashlib.md5(os.urandom(8)).hexdigest() self.ncvalue_num = 1 self.login_attempts = 0 for i in range(3): self.login_attempts += 1 response = self.__send_one_http_request(path, auth=True) if response.status_code == 200: + if not self.subsequent_login: + self.__store_latest_password_hash_method() self.subsequent_login = True logger_auth.info('Login successful %s', response) logger_auth.debug("Response: %s", response.headers) @@ -705,7 +748,7 @@ def login(self): logger_auth.error( 'Login -%d- failed, Response: %s', i, response) - logger_auth.error('Response-raw: %s', response.raw) + logger_auth.error('Response: %s ; %s', response.headers, response) if self.subsequent_login: logger_auth.info( "Retrying login in 10 seconds") @@ -743,11 +786,15 @@ def __retrieve_auth_from_response(self, response): self.ncvalue_num = 1 if auth_dict.get('cnonce'): self.cnonce = auth_dict['cnonce'] - else: - self.cnonce = "NaN" if auth_dict.get('nonce'): self.nonce = auth_dict['nonce'] + logger_auth.debug("nc: %s, cnonce: %s, nonce: %s", + self.ncvalue_num , + self.cnonce, + self.nonce + ) + def __split_response_auth_header(self, response): """ Split the response header into a dictionary.""" auth_dict = {} @@ -764,14 +811,20 @@ def __split_response_auth_header(self, response): 'No authentication header found in response') return auth_dict - auth_list = auth_string.replace(" ", "").replace('"', '').split(',') + # Remove quotes and split by comma + auth_list = auth_string.replace('"', '').split(',') logger_auth.debug("Authentication header: %s", auth_list) auth_dict = {} for item in auth_list: - key, value = item.split("=") - auth_dict[key] = value - logger_auth.debug( - "Authentication header key-value pair - %s: %s", key, value) + # Strip whitespace from each item and check if it contains '=' + item = item.strip() + if '=' in item: + key, value = item.split("=", 1) # Split only on first '=' + key = key.strip() + value = value.strip() + auth_dict[key] = value + logger_auth.debug( + "Authentication header key-value pair - %s: %s", key, value) return auth_dict def get_auth_header(self, method, path) -> str: @@ -782,6 +835,11 @@ def get_auth_header(self, method, path) -> str: cnonce = self.cnonce user = self.user password = self.password + algorithm = self.api_config.auth_algorithm + password_algorithm = algorithm + + password_algorithm = self.__get_password_hash_method() + if len(self.user) < 4: raise RuntimeError("User needed for Authorization") if len(self.password) < 4: @@ -789,15 +847,52 @@ def get_auth_header(self, method, path) -> str: a1 = f"{user}:{realm}:{password}" a2 = f"{method}:{path}" - ha1 = hash_utf8(a1) - ha2 = hash_utf8(a2) + ha1 = hash_utf8(a1, password_algorithm) + ha2 = hash_utf8(a2, algorithm) noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{ha2}" - respdig = hash_utf8(f"{ha1}:{noncebit}") + respdig = hash_utf8(f"{ha1}:{noncebit}", algorithm) auth_header = f'Digest username="{user}", realm="{realm}", nonce="{nonce}", uri="{path}", ' - auth_header += f'algorithm="MD5", qop=auth, nc={ncvalue}, cnonce="{cnonce}", ' + auth_header += f'algorithm="{algorithm}", qop=auth, nc={ncvalue}, cnonce="{cnonce}", ' auth_header += f'response="{respdig}"' return auth_header + + def __get_password_hash_method(self) -> str: + """ Figure out the password hash method during first login.""" + # If we already found a working method, use it + if self.password_hash is not None: + return self.password_hash + + # Index is initialized to -1. Increment to get the next method. + password_algorithm = "" + if self.api_config.auth_algorithm == "SHA256": + self._last_password_hash_method_index += 1 + if self._last_password_hash_method_index >= len(self.usable_password_hash_methods): + self._last_password_hash_method_index = 0 + password_algorithm = self.usable_password_hash_methods[ + self._last_password_hash_method_index + ] + logger_auth.debug("Trying password hash method %s", password_algorithm) + else: + # Fallback to MD5 only for older firmwares + password_algorithm = "MD5" + # Set password_hash immediately for MD5 since there's only one option + # Setting this here prevents __store_latest_password_hash_method from changing it later + self.password_hash = password_algorithm + + return password_algorithm + + def __store_latest_password_hash_method(self): + """ Save the password hash method to use after a successful login.""" + if self.password_hash is not None: + # We already have a working method, do not change it + return + self.password_hash = self.usable_password_hash_methods[ + self._last_password_hash_method_index + ] + logger_auth.debug("Password hash method set to %s", + self.password_hash) + def __set_em(self, mode=None, power=None): """ Change Energy Management """ settings = {} diff --git a/tests/batcontrol/inverter/__init__.py b/tests/batcontrol/inverter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/batcontrol/inverter/test_fronius_auth.py b/tests/batcontrol/inverter/test_fronius_auth.py new file mode 100644 index 00000000..422c3b2a --- /dev/null +++ b/tests/batcontrol/inverter/test_fronius_auth.py @@ -0,0 +1,86 @@ +"""Tests for Fronius inverter authentication functionality.""" +import unittest +import hashlib +from packaging import version + +from batcontrol.inverter.fronius import hash_utf8, get_api_config, FroniusApiConfig + + +class TestFroniusAuthentication(unittest.TestCase): + """Test authentication algorithm selection based on firmware version.""" + + def test_hash_utf8_md5_default(self): + """Test that hash_utf8 defaults to MD5.""" + test_string = "test_string" + result = hash_utf8(test_string) + expected = hashlib.md5(test_string.encode('utf-8')).hexdigest() + self.assertEqual(result, expected) + + def test_hash_utf8_md5_explicit(self): + """Test hash_utf8 with explicit MD5 algorithm.""" + test_string = "test_string" + result = hash_utf8(test_string, "MD5") + expected = hashlib.md5(test_string.encode('utf-8')).hexdigest() + self.assertEqual(result, expected) + + def test_hash_utf8_sha256(self): + """Test hash_utf8 with SHA256 algorithm.""" + test_string = "test_string" + result = hash_utf8(test_string, "SHA256") + expected = hashlib.sha256(test_string.encode('utf-8')).hexdigest() + self.assertEqual(result, expected) + + def test_hash_utf8_case_insensitive(self): + """Test that algorithm parameter is case insensitive.""" + test_string = "test_string" + result_upper = hash_utf8(test_string, "SHA256") + result_lower = hash_utf8(test_string, "sha256") + result_mixed = hash_utf8(test_string, "Sha256") + self.assertEqual(result_upper, result_lower) + self.assertEqual(result_upper, result_mixed) + + def test_api_config_version_before_1_38_6_1_uses_md5(self): + """Test that firmware versions before 1.38.6-1 use MD5.""" + test_versions = ["1.35.0", "1.36.0", "1.37.0", "1.38.5"] + for v in test_versions: + with self.subTest(version=v): + parsed_version = version.parse(v) + config = get_api_config(parsed_version) + self.assertEqual(config.auth_algorithm, "MD5") + + def test_api_config_version_1_38_6_1_and_later_uses_sha256(self): + """Test that firmware version 1.38.6-1 and later use SHA256.""" + test_versions = ["1.38.6-1", "1.38.7", "1.39.0", "2.0.0"] + for v in test_versions: + with self.subTest(version=v): + parsed_version = version.parse(v) + config = get_api_config(parsed_version) + self.assertEqual(config.auth_algorithm, "SHA256") + + def test_api_config_boundary_version(self): + """Test the exact boundary version 1.38.6-1.""" + # Version just before the boundary should use MD5 + version_before = version.parse("1.38.5") + config_before = get_api_config(version_before) + self.assertEqual(config_before.auth_algorithm, "MD5") + + # The boundary version should use SHA256 + boundary_version = version.parse("1.38.6-1") + config_boundary = get_api_config(boundary_version) + self.assertEqual(config_boundary.auth_algorithm, "SHA256") + + def test_hash_utf8_with_bytes_input(self): + """Test hash_utf8 with bytes input.""" + test_bytes = b"test_bytes" + result_md5 = hash_utf8(test_bytes, "MD5") + result_sha256 = hash_utf8(test_bytes, "SHA256") + + expected_md5 = hashlib.md5(test_bytes).hexdigest() + expected_sha256 = hashlib.sha256(test_bytes).hexdigest() + + self.assertEqual(result_md5, expected_md5) + self.assertEqual(result_sha256, expected_sha256) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file