Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cores/esp32/HEXBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include "HEXBuilder.h"
#include <ctype.h>

static uint8_t hex_char_to_byte(uint8_t c) {
return (c >= 'a' && c <= 'f') ? (c - ((uint8_t)'a' - 0xa))
Expand All @@ -26,6 +27,19 @@ static uint8_t hex_char_to_byte(uint8_t c) {
: 0x10; // unknown char is 16
}

bool HEXBuilder::isHexString(const char *str, size_t len) {
for (size_t i = 0; i < len; i++) {
if (isxdigit(str[i]) == 0) {
return false;
}
}
return true;
}

bool HEXBuilder::isHexString(String str) {
return isHexString(str.c_str(), str.length());
}

size_t HEXBuilder::hex2bytes(unsigned char *out, size_t maxlen, String &in) {
return hex2bytes(out, maxlen, in.c_str());
}
Expand Down
3 changes: 3 additions & 0 deletions cores/esp32/HEXBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ class HEXBuilder {

static String bytes2hex(const unsigned char *in, size_t len);
static size_t bytes2hex(char *out, size_t maxlen, const unsigned char *in, size_t len);

static bool isHexString(const char *str, size_t len);
static bool isHexString(String str);
};
#endif
21 changes: 21 additions & 0 deletions libraries/ArduinoOTA/src/ArduinoOTA.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ArduinoOTA.h"
#include "NetworkClient.h"
#include "ESPmDNS.h"
#include "HEXBuilder.h"
#include "SHA2Builder.h"
#include "PBKDF2_HMACBuilder.h"
#include "Update.h"
Expand Down Expand Up @@ -86,6 +87,26 @@ ArduinoOTAClass &ArduinoOTAClass::setPassword(const char *password) {

ArduinoOTAClass &ArduinoOTAClass::setPasswordHash(const char *password) {
if (_state == OTA_IDLE && password) {
size_t len = strlen(password);
bool is_hex = HEXBuilder::isHexString(password, len);

if (!is_hex) {
log_e("Invalid password hash. Expected hex string (0-9, a-f, A-F).");
return *this;
}

if (len == 32) {
// Warn if MD5 hash is detected (32 hex characters)
log_w("MD5 password hash detected. MD5 is deprecated and insecure.");
log_w("Please use setPassword() with plain text or setPasswordHash() with SHA256 hash (64 chars).");
log_w("To generate SHA256: echo -n 'yourpassword' | sha256sum");
} else if (len == 64) {
log_i("Using SHA256 password hash.");
} else {
log_e("Invalid password hash length. Expected 32 (deprecated MD5) or 64 (SHA256) characters.");
return *this;
}

// Store the pre-hashed password directly
_password.clear();
_password = password;
Expand Down
Binary file modified tools/espota.exe
Binary file not shown.
211 changes: 161 additions & 50 deletions tools/espota.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Modified since 2015-09-18 from Pascal Gollor (https://github.com/pgollor)
# Modified since 2015-11-09 from Hristo Gochkov (https://github.com/me-no-dev)
# Modified since 2016-01-03 from Matthew O'Gorman (https://githumb.com/mogorman)
# Modified since 2025-09-04 from Lucas Saavedra Vaz (https://github.com/lucasssvaz)
#
# This script will push an OTA update to the ESP
# use it like:
Expand Down Expand Up @@ -36,6 +37,19 @@
# - Incorporated exception handling to catch and handle potential errors.
# - Made variable names more descriptive for better readability.
# - Introduced constants for better code maintainability.
#
# Changes
# 2025-09-04:
# - Changed authentication to use PBKDF2-HMAC-SHA256 for challenge/response
#
# Changes
# 2025-09-18:
# - Fixed authentication when using old images with MD5 passwords
#
# Changes
# 2025-10-07:
# - Fixed authentication when images might use old MD5 hashes stored in the firmware


from __future__ import print_function
import socket
Expand Down Expand Up @@ -81,7 +95,7 @@ def update_progress(progress):
sys.stderr.flush()


def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md5_target):
def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message):
"""
Send invitation to ESP device and get authentication challenge.
Returns (success, auth_data, error_message) tuple.
Expand All @@ -107,10 +121,9 @@ def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md

sock2.settimeout(TIMEOUT)
try:
if md5_target:
data = sock2.recv(37).decode() # "AUTH " + 32-char MD5 nonce
else:
data = sock2.recv(69).decode() # "AUTH " + 64-char SHA256 nonce
# Try to read up to 69 bytes for new protocol (SHA256)
# If device sends less (37 bytes), it's using old MD5 protocol
data = sock2.recv(69).decode()
sock2.close()
break
except: # noqa: E722
Expand All @@ -127,34 +140,49 @@ def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md
return True, data, None


def authenticate(remote_addr, remote_port, password, md5_target, filename, content_size, file_md5, nonce):
def authenticate(
remote_addr, remote_port, password, use_md5_password, use_old_protocol, filename, content_size, file_md5, nonce
):
"""
Perform authentication with the ESP device using either MD5 or SHA256 method.
Perform authentication with the ESP device.

Args:
use_md5_password: If True, hash password with MD5 instead of SHA256
use_old_protocol: If True, use old MD5 challenge/response protocol (pre-3.3.1)

Returns (success, error_message) tuple.
"""
cnonce_text = "%s%u%s%s" % (filename, content_size, file_md5, remote_addr)
remote_address = (remote_addr, int(remote_port))

if md5_target:
if use_old_protocol:
# Generate client nonce (cnonce)
cnonce = hashlib.md5(cnonce_text.encode()).hexdigest()

# MD5 challenge/response protocol (insecure, use only for compatibility with old firmwares)
# 1. Hash the password with MD5 (to match ESP32 storage)
# Old MD5 challenge/response protocol (pre-3.3.1)
# 1. Hash the password with MD5
password_hash = hashlib.md5(password.encode()).hexdigest()

# 2. Create challenge response
challenge = "%s:%s:%s" % (password_hash, nonce, cnonce)
response = hashlib.md5(challenge.encode()).hexdigest()
expected_response_length = 32
else:
# Generate client nonce (cnonce)
# Generate client nonce (cnonce) using SHA256 for new protocol
cnonce = hashlib.sha256(cnonce_text.encode()).hexdigest()

# PBKDF2-HMAC-SHA256 challenge/response protocol
# The ESP32 stores the password as SHA256 hash, so we need to hash the password first
# 1. Hash the password with SHA256 (to match ESP32 storage)
password_hash = hashlib.sha256(password.encode()).hexdigest()
# New PBKDF2-HMAC-SHA256 challenge/response protocol (3.3.1+)
# The password can be hashed with either MD5 or SHA256
if use_md5_password:
# Use MD5 for password hash (for devices that stored MD5 hashes)
logging.warning(
"Using insecure MD5 hash for password due to legacy device support. "
"Please upgrade devices to ESP32 Arduino Core 3.3.1+ for improved security."
)
password_hash = hashlib.md5(password.encode()).hexdigest()
else:
# Use SHA256 for password hash (recommended)
password_hash = hashlib.sha256(password.encode()).hexdigest()

# 2. Derive key using PBKDF2-HMAC-SHA256 with the password hash
salt = nonce + ":" + cnonce
Expand Down Expand Up @@ -189,9 +217,9 @@ def authenticate(remote_addr, remote_port, password, md5_target, filename, conte
return False, str(e)


def serve(
def serve( # noqa: C901
remote_addr, local_addr, remote_port, local_port, password, md5_target, filename, command=FLASH
): # noqa: C901
):
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (local_addr, local_port)
Expand All @@ -210,58 +238,138 @@ def serve(
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)

# Send invitation and get authentication challenge
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message, md5_target)
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message)
if not success:
logging.error(error)
return 1

if data != "OK":
if data.startswith("AUTH"):
nonce = data.split()[1]
nonce_length = len(nonce)

# Try authentication with the specified method first
sys.stderr.write("Authenticating...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr, remote_port, password, md5_target, filename, content_size, file_md5, nonce
)
# Detect protocol version based on nonce length:
# - 32 chars = Old MD5 protocol (pre-3.3.1)
# - 64 chars = New SHA256 protocol (3.3.1+)

if nonce_length == 32:
# Scenario 1: Old device (pre-3.3.1) using MD5 protocol
logging.info("Detected old MD5 protocol (pre-3.3.1)")
sys.stderr.write("Authenticating (MD5 protocol)...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=True,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)

if not auth_success:
# If authentication failed and we're not already using MD5, try with MD5
if not md5_target:
if not auth_success:
sys.stderr.write("FAIL\n")
logging.warning("Authentication failed with SHA256, retrying with MD5: %s", auth_error)
logging.error("Authentication Failed: %s", auth_error)
return 1

# Restart the entire process with MD5 to get a fresh nonce
success, data, error = send_invitation_and_get_auth_challenge(
remote_addr, remote_port, message, True
sys.stderr.write("OK\n")
logging.warning("====================================================================")
logging.warning("WARNING: Device is using old MD5 authentication protocol (pre-3.3.1)")
logging.warning("Please update to ESP32 Arduino Core 3.3.1+ for improved security.")
logging.warning("======================================================================")

elif nonce_length == 64:
# New protocol (3.3.1+) - try SHA256 password first, then MD5 if it fails

# Scenario 2: Try SHA256 password hash first (recommended for new devices)
if md5_target:
# User explicitly requested MD5 password hash
logging.info("Using MD5 password hash as requested")
sys.stderr.write("Authenticating (SHA256 protocol with MD5 password)...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
else:
# Try SHA256 password hash first
sys.stderr.write("Authenticating...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=False,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
if not success:
logging.error("Failed to re-establish connection for MD5 retry: %s", error)
return 1

if data.startswith("AUTH"):
nonce = data.split()[1]
sys.stderr.write("Retrying with MD5...")
# Scenario 3: If SHA256 fails, try MD5 password hash (for devices with stored MD5 passwords)
if not auth_success:
logging.info("SHA256 password failed, trying MD5 password hash")
sys.stderr.write("Retrying with MD5 password...")
sys.stderr.flush()

# Device is back in OTA_IDLE after auth failure, need to send new invitation
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message)
if not success:
sys.stderr.write("FAIL\n")
logging.error("Failed to get new challenge for MD5 retry: %s", error)
return 1

if not data.startswith("AUTH"):
sys.stderr.write("FAIL\n")
logging.error("Expected AUTH challenge for MD5 retry, got: %s", data)
return 1

# Get new nonce for second attempt
nonce = data.split()[1]

auth_success, auth_error = authenticate(
remote_addr, remote_port, password, True, filename, content_size, file_md5, nonce
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
else:
auth_success = False
auth_error = "Expected AUTH challenge for MD5 retry, got: " + data

if not auth_success:
sys.stderr.write("FAIL\n")
logging.error("Authentication failed with both SHA256 and MD5: %s", auth_error)
return 1
else:
# Already tried MD5 and it failed
if auth_success:
logging.warning("====================================================================")
logging.warning("WARNING: Device authenticated with MD5 password hash (deprecated)")
logging.warning("MD5 is cryptographically broken and should not be used.")
logging.warning(
"Please update your sketch to use either setPassword() or setPasswordHash()"
)
logging.warning(
"with SHA256, then upload again to migrate to the new secure SHA256 protocol."
)
logging.warning("======================================================================")

if not auth_success:
sys.stderr.write("FAIL\n")
logging.error("Authentication failed: %s", auth_error)
logging.error("Authentication Failed: %s", auth_error)
return 1

sys.stderr.write("OK\n")
sys.stderr.write("OK\n")
else:
logging.error("Invalid nonce length: %d (expected 32 or 64)", nonce_length)
return 1
else:
logging.error("Bad Answer: %s", data)
return 1
Expand Down Expand Up @@ -381,7 +489,10 @@ def parse_args(unparsed_args):
"-m",
"--md5-target",
dest="md5_target",
help="Target device is using MD5 checksum. This is insecure, use only for compatibility with old firmwares.",
help=(
"Use MD5 for password hashing (for devices with stored MD5 passwords). "
"By default, SHA256 is tried first, then MD5 as fallback."
),
action="store_true",
default=False,
)
Expand Down
Loading