Skip to content
131 changes: 113 additions & 18 deletions src/batcontrol/inverter/fronius.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,20 @@

logger_auth = logging.getLogger("batcontrol.inverter.fronius.auth")

def hash_utf8(x):
"""Hash a string or bytes object."""
def hash_utf8(x, algorithm="MD5"):
"""Hash a string or bytes object.

Args:
x: String or bytes to hash
algorithm: Hash algorithm to use ("MD5" or "SHA256")
"""
if isinstance(x, str):
x = x.encode("utf-8")
return hashlib.md5(x).hexdigest()

if algorithm.upper() == "SHA256":
return hashlib.sha256(x).hexdigest()
else: # Default to MD5 for backward compatibility
return hashlib.md5(x).hexdigest()


def strip_dict(original):
Expand Down Expand Up @@ -76,6 +85,7 @@ class FroniusApiConfig:
config_timeofuse_path: str
commands_login_path: str
commands_logout_path: str
auth_algorithm: str = "SHA256" # Authentication algorithm: "MD5" or "SHA256"


# Alle Konfigurationen in einer Liste
Expand All @@ -92,6 +102,7 @@ class FroniusApiConfig:
config_timeofuse_path='/config/timeofuse',
commands_login_path='/commands/Login',
commands_logout_path='/commands/Logout',
auth_algorithm="MD5",
),
FroniusApiConfig(
from_version=version.parse("1.28.7-1"),
Expand All @@ -105,9 +116,24 @@ class FroniusApiConfig:
config_timeofuse_path='/config/timeofuse',
commands_login_path='/commands/Login',
commands_logout_path='/commands/Logout',
auth_algorithm="MD5",
),
FroniusApiConfig(
from_version=version.parse("1.36"),
to_version=version.parse("1.38.6-1"),
version_path='/api/status/version',
powerflow_path='/solar_api/v1/GetPowerFlowRealtimeData.fcgi',
storage_path='/solar_api/v1/GetStorageRealtimeData.cgi',
config_battery_path='/api/config/batteries',
config_powerunit_path='/api/config/powerunit',
config_solar_api_path='/api/config/solar_api',
config_timeofuse_path='/api/config/timeofuse',
commands_login_path='/api/commands/Login',
commands_logout_path='/api/commands/Logout',
auth_algorithm="MD5",
),
FroniusApiConfig(
from_version=version.parse("1.38.6-1"),
to_version=version.parse("9999.99.99"),
version_path='/api/status/version',
powerflow_path='/solar_api/v1/GetPowerFlowRealtimeData.fcgi',
Expand All @@ -118,6 +144,7 @@ class FroniusApiConfig:
config_timeofuse_path='/api/config/timeofuse',
commands_login_path='/api/commands/Login',
commands_logout_path='/api/commands/Logout',
auth_algorithm="SHA256",
),
]

Expand All @@ -136,9 +163,21 @@ class FroniusWR(InverterBaseclass):

def __init__(self, config: dict) -> None:
super().__init__(config)

# We are doing three login tests during first login.
# As MD5 was the default on the old firmware, the latest
# retries should be MD5.
self.usable_password_hash_methods = [
"SHA256", # First try: SHA256
"MD5", # Second try: MD5
"MD5" # Third try: MD5 again (retry with same method)
]
self._last_password_hash_method_index = -1
self.password_hash = None

self.subsequent_login = False
self.ncvalue_num = 1
self.cnonce = "NaN"
self.cnonce = hashlib.md5(os.urandom(8)).hexdigest()
self.login_attempts = 0
self.address = config['address']
self.capacity = -1
Expand Down Expand Up @@ -198,7 +237,7 @@ def __init__(self, config: dict) -> None:
self.previous_battery_config['HYB_BACKUP_RESERVED']
)
self.max_soc = self.previous_battery_config['BAT_M0_SOC_MAX']
self.backup_time_of_use() # save timesofuse
self.backup_time_of_use() # save timesofuse
self.set_allow_grid_charging(True)

def get_firmware_version(self) -> version:
Expand Down Expand Up @@ -653,6 +692,8 @@ def __send_one_http_request(self, path, method='GET', payload="",
if auth:
headers['Authorization'] = self.get_auth_header(
method=method, path=fullpath)
logger_auth.debug("Requesting %s , header: %s",
fullpath, headers)

for i in range(3):
# 3 retries if connection can't be established
Expand Down Expand Up @@ -687,13 +728,15 @@ def login(self):
"""Login to Fronius API"""
logger_auth.debug("Logging in")
path = self.api_config.commands_login_path
self.cnonce = "NaN"
self.cnonce = hashlib.md5(os.urandom(8)).hexdigest()
self.ncvalue_num = 1
self.login_attempts = 0
for i in range(3):
self.login_attempts += 1
response = self.__send_one_http_request(path, auth=True)
if response.status_code == 200:
if not self.subsequent_login:
self.__store_latest_password_hash_method()
self.subsequent_login = True
logger_auth.info('Login successful %s', response)
logger_auth.debug("Response: %s", response.headers)
Expand All @@ -705,7 +748,7 @@ def login(self):

logger_auth.error(
'Login -%d- failed, Response: %s', i, response)
logger_auth.error('Response-raw: %s', response.raw)
logger_auth.error('Response: %s ; %s', response.headers, response)
if self.subsequent_login:
logger_auth.info(
"Retrying login in 10 seconds")
Expand Down Expand Up @@ -743,11 +786,15 @@ def __retrieve_auth_from_response(self, response):
self.ncvalue_num = 1
if auth_dict.get('cnonce'):
self.cnonce = auth_dict['cnonce']
else:
self.cnonce = "NaN"
if auth_dict.get('nonce'):
self.nonce = auth_dict['nonce']

logger_auth.debug("nc: %s, cnonce: %s, nonce: %s",
self.ncvalue_num ,
self.cnonce,
self.nonce
)

def __split_response_auth_header(self, response):
""" Split the response header into a dictionary."""
auth_dict = {}
Expand All @@ -764,14 +811,20 @@ def __split_response_auth_header(self, response):
'No authentication header found in response')
return auth_dict

auth_list = auth_string.replace(" ", "").replace('"', '').split(',')
# Remove quotes and split by comma
auth_list = auth_string.replace('"', '').split(',')
logger_auth.debug("Authentication header: %s", auth_list)
auth_dict = {}
for item in auth_list:
key, value = item.split("=")
auth_dict[key] = value
logger_auth.debug(
"Authentication header key-value pair - %s: %s", key, value)
# Strip whitespace from each item and check if it contains '='
item = item.strip()
if '=' in item:
key, value = item.split("=", 1) # Split only on first '='
key = key.strip()
value = value.strip()
auth_dict[key] = value
logger_auth.debug(
"Authentication header key-value pair - %s: %s", key, value)
return auth_dict

def get_auth_header(self, method, path) -> str:
Expand All @@ -782,22 +835,64 @@ def get_auth_header(self, method, path) -> str:
cnonce = self.cnonce
user = self.user
password = self.password
algorithm = self.api_config.auth_algorithm
password_algorithm = algorithm

password_algorithm = self.__get_password_hash_method()

if len(self.user) < 4:
raise RuntimeError("User needed for Authorization")
if len(self.password) < 4:
raise RuntimeError("Password needed for Authorization")

a1 = f"{user}:{realm}:{password}"
a2 = f"{method}:{path}"
ha1 = hash_utf8(a1)
ha2 = hash_utf8(a2)
ha1 = hash_utf8(a1, password_algorithm)
ha2 = hash_utf8(a2, algorithm)
noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{ha2}"
respdig = hash_utf8(f"{ha1}:{noncebit}")
respdig = hash_utf8(f"{ha1}:{noncebit}", algorithm)
auth_header = f'Digest username="{user}", realm="{realm}", nonce="{nonce}", uri="{path}", '
auth_header += f'algorithm="MD5", qop=auth, nc={ncvalue}, cnonce="{cnonce}", '
auth_header += f'algorithm="{algorithm}", qop=auth, nc={ncvalue}, cnonce="{cnonce}", '
auth_header += f'response="{respdig}"'
return auth_header


def __get_password_hash_method(self) -> str:
""" Figure out the password hash method during first login."""
# If we already found a working method, use it
if self.password_hash is not None:
return self.password_hash

# Index is initialized to -1. Increment to get the next method.
password_algorithm = ""
if self.api_config.auth_algorithm == "SHA256":
self._last_password_hash_method_index += 1
if self._last_password_hash_method_index >= len(self.usable_password_hash_methods):
self._last_password_hash_method_index = 0
password_algorithm = self.usable_password_hash_methods[
self._last_password_hash_method_index
]
logger_auth.debug("Trying password hash method %s", password_algorithm)
else:
# Fallback to MD5 only for older firmwares
password_algorithm = "MD5"
# Set password_hash immediately for MD5 since there's only one option
# Setting this here prevents __store_latest_password_hash_method from changing it later
self.password_hash = password_algorithm

return password_algorithm

def __store_latest_password_hash_method(self):
""" Save the password hash method to use after a successful login."""
if self.password_hash is not None:
# We already have a working method, do not change it
return
self.password_hash = self.usable_password_hash_methods[
self._last_password_hash_method_index
]
logger_auth.debug("Password hash method set to %s",
self.password_hash)

def __set_em(self, mode=None, power=None):
""" Change Energy Management """
settings = {}
Expand Down
Empty file.
86 changes: 86 additions & 0 deletions tests/batcontrol/inverter/test_fronius_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Tests for Fronius inverter authentication functionality."""
import unittest
import hashlib
from packaging import version

from batcontrol.inverter.fronius import hash_utf8, get_api_config, FroniusApiConfig


class TestFroniusAuthentication(unittest.TestCase):
"""Test authentication algorithm selection based on firmware version."""

def test_hash_utf8_md5_default(self):
"""Test that hash_utf8 defaults to MD5."""
test_string = "test_string"
result = hash_utf8(test_string)
expected = hashlib.md5(test_string.encode('utf-8')).hexdigest()
self.assertEqual(result, expected)

def test_hash_utf8_md5_explicit(self):
"""Test hash_utf8 with explicit MD5 algorithm."""
test_string = "test_string"
result = hash_utf8(test_string, "MD5")
expected = hashlib.md5(test_string.encode('utf-8')).hexdigest()
self.assertEqual(result, expected)

def test_hash_utf8_sha256(self):
"""Test hash_utf8 with SHA256 algorithm."""
test_string = "test_string"
result = hash_utf8(test_string, "SHA256")
expected = hashlib.sha256(test_string.encode('utf-8')).hexdigest()
self.assertEqual(result, expected)

def test_hash_utf8_case_insensitive(self):
"""Test that algorithm parameter is case insensitive."""
test_string = "test_string"
result_upper = hash_utf8(test_string, "SHA256")
result_lower = hash_utf8(test_string, "sha256")
result_mixed = hash_utf8(test_string, "Sha256")
self.assertEqual(result_upper, result_lower)
self.assertEqual(result_upper, result_mixed)

def test_api_config_version_before_1_38_6_1_uses_md5(self):
"""Test that firmware versions before 1.38.6-1 use MD5."""
test_versions = ["1.35.0", "1.36.0", "1.37.0", "1.38.5"]
for v in test_versions:
with self.subTest(version=v):
parsed_version = version.parse(v)
config = get_api_config(parsed_version)
self.assertEqual(config.auth_algorithm, "MD5")

def test_api_config_version_1_38_6_1_and_later_uses_sha256(self):
"""Test that firmware version 1.38.6-1 and later use SHA256."""
test_versions = ["1.38.6-1", "1.38.7", "1.39.0", "2.0.0"]
for v in test_versions:
with self.subTest(version=v):
parsed_version = version.parse(v)
config = get_api_config(parsed_version)
self.assertEqual(config.auth_algorithm, "SHA256")

def test_api_config_boundary_version(self):
"""Test the exact boundary version 1.38.6-1."""
# Version just before the boundary should use MD5
version_before = version.parse("1.38.5")
config_before = get_api_config(version_before)
self.assertEqual(config_before.auth_algorithm, "MD5")

# The boundary version should use SHA256
boundary_version = version.parse("1.38.6-1")
config_boundary = get_api_config(boundary_version)
self.assertEqual(config_boundary.auth_algorithm, "SHA256")

def test_hash_utf8_with_bytes_input(self):
"""Test hash_utf8 with bytes input."""
test_bytes = b"test_bytes"
result_md5 = hash_utf8(test_bytes, "MD5")
result_sha256 = hash_utf8(test_bytes, "SHA256")

expected_md5 = hashlib.md5(test_bytes).hexdigest()
expected_sha256 = hashlib.sha256(test_bytes).hexdigest()

self.assertEqual(result_md5, expected_md5)
self.assertEqual(result_sha256, expected_sha256)


if __name__ == '__main__':
unittest.main()