关于嵌入式软件,错误处理的重要性怎么估计都不为过。 嵌入式系统应该在不同的物理条件下在没有监督的情况下工作,例如控制可能发生故障转移或不总是提供可靠通信线路的外部外部设备。 在许多情况下,系统的故障要么代价高昂,要么就是不安全。
在本章中,我们将了解帮助您编写可靠且容错的嵌入式应用的常见策略和最佳实践。
在本章中,我们将介绍以下食谱:
- 使用错误代码
- 使用异常进行错误处理
- 捕获异常时使用常量引用
- 处理静态对象
- 使用看门狗
- 探索高可用性系统的心跳
- 实现软件去抖动逻辑
这些菜谱将帮助您理解错误处理设计的重要性,学习最佳实践,并避免该领域的陷阱。
在设计新函数时,开发人员通常需要一种机制来指示函数由于某种错误而无法完成其工作。 它可能无效、从外部设备接收到意外结果或资源分配问题。
报告错误状况的最传统和最广泛的方式之一是通过错误代码。 这是一种高效且无处不在的机制,不依赖于编程语言或操作系统。 由于其高效性、通用性和跨平台能力,在嵌入式软件开发中得到了广泛的应用。
设计返回值或错误代码的函数接口可能很棘手,特别是当值和错误代码具有不同类型时。 在本食谱中,我们将探索设计这类函数接口的几种方法。
我们将创建一个简单的程序,其中包含名为Receive
的函数的三个实现。 这三个实现具有相同的行为,但接口不同。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为errcode
的子目录。 - 使用您喜欢的文本编辑器在
errcode
子目录中创建名为errcode.cpp
的文件。 - 将第一个函数的实现添加到
errcode.cpp
文件:
#include <iostream>
int Receive(int input, std::string& output) {
if (input < 0) {
return -1;
}
output = "Hello";
return 0;
}
- 接下来,我们添加第二个实现:
std::string Receive(int input, int& error) {
if (input < 0) {
error = -1;
return "";
}
error = 0;
return "Hello";
}
Receive
函数的第三个实现如下:
std::pair<int, std::string> Receive(int input) {
std::pair<int, std::string> result;
if (input < 0) {
result.first = -1;
} else {
result.second = "Hello";
}
return result;
}
- 现在,我们定义一个名为
Display
的帮助器函数来显示结果:
void Display(const char* prefix, int err, const std::string& result) {
if (err < 0) {
std::cout << prefix << " error: " << err << std::endl;
} else {
std::cout << prefix << " result: " << result << std::endl;
}
}
- 然后,我们添加一个名为
Test
的函数,该函数调用所有三个实现:
void Test(int input) {
std::string outputResult;
int err = Receive(input, outputResult);
Display(" Receive 1", err, outputResult);
int outputErr = -1;
std::string result = Receive(input, outputErr);
Display(" Receive 2", outputErr, result);
std::pair<int, std::string> ret = Receive(input);
Display(" Receive 3", ret.first, ret.second);
}
main
函数将所有内容联系在一起:
int main() {
std::cout << "Input: -1" << std::endl;
Test(-1);
std::cout << "Input: 1" << std::endl;
Test(1);
return 0;
}
- 最后,我们创建一个
CMakeLists.txt
文件,其中包含程序的构建规则:
cmake_minimum_required(VERSION 3.5.1)
project(errcode)
add_executable(errcode errcode.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
在我们的应用中,我们定义了从某个设备接收数据的函数的三种不同实现。 它应该以字符串的形式返回接收到的数据,但是在出现错误的情况下,它应该返回一个表示错误原因的整数错误代码。
由于结果和错误代码具有不同的类型,因此我们不能对两者重用相同的值。 要在 C++ 中返回多个值,我们需要使用输出参数或创建复合数据类型。
我们的实现探索了这两种策略。 我们使用 C++ 函数重载来定义同名但不同类型的参数和返回值的Receive
函数。
第一个实现返回错误代码并将结果存储在输出参数 Result 中:
int Receive(int input, std::string& output)
输出参数是通过引用传递的字符串,用于让函数修改其内容。 第二个实现将参数反转。 它返回一个收到的字符串作为结果,并接受错误代码作为输出参数:
std::string Receive(int input, int& error)
因为我们希望从函数内部设置错误代码,所以我们也通过引用传递它。 最后,第三个实现在 C++ pair
中组合并返回结果和错误代码:
std::pair<int, std::string> Receive(int input)
该函数始终创建一个std::pair<int, std::string>
实例。 因为我们没有将任何值传递给它的构造函数,所以该对象是默认初始化的。 整数元素设置为0
,字符串元素设置为空字符串。
这种方法不需要output
参数,可读性更好,但构造和销毁pair
对象的开销略高。
定义了所有三个实现后,我们将在Test
函数中测试所有实现。 我们将相同的参数传递给每个实现并显示结果。 我们希望它们每一个都能产生相同的结果。
有两次调用Test
。 首先,我们将-1
作为参数传递,这将触发错误路径,然后我们传递1
,这将激活正常操作路径:
std::cout << "Input: -1" << std::endl;
Test(-1);
std::cout << "Input: 1" << std::endl;
Test(1);
当我们运行我们的程序时,我们看到以下输出:
所有三种实现都根据输入参数正确返回结果或错误代码。 您可以根据总体设计指导原则或个人喜好在应用中使用任何方法。
作为 C++ 17 标准的一部分,标准库中添加了一个名为std::optional
的模板。 它可以表示可能缺失的可选值。 它可以用作可能失败的函数的返回值。 但是,它不能表示失败的原因,只能表示一个布尔值,指示该值是否有效。 有关更多信息,请查看位于https://en.cppreference.com/w/cpp/utility/optional的std::optional
参考。
虽然错误代码仍然是嵌入式编程中最广泛使用的错误处理技术,但 C++ 提供了另一种用于此目的的机制,称为异常。
异常旨在简化错误处理并使其更可靠。 使用错误代码时,开发人员必须检查每个函数的结果是否有错误,并将结果传播到调用函数。 这会用大量 if-Else 结构使代码变得混乱,使函数逻辑更加模糊。
使用异常时,开发人员不需要在每次函数调用后检查错误。 异常通过调用堆栈自动传播,直到它们到达可以通过记录、重试或终止应用来正确处理它的代码。
虽然异常是 C++ 标准库的默认错误处理机制,但与外部设备或底层操作系统层的通信仍然涉及错误代码。 在本食谱中,我们将学习如何使用std::system_error
Exception 类将低级错误处理连接到 C++ 异常。
我们将创建一个简单的应用,它通过串行链路与设备通信。 遵循以下步骤:
-
在您的工作目录(即
~/test
)中,创建一个名为except
的子目录。 -
使用您喜欢的文本编辑器在
ex``cept
子目录中创建名为except.cpp
的文件。 -
将所需的包含内容放入
except.cpp
文件中:
#include <iostream>
#include <system_error>
#include <fcntl.h>
#include <unistd.h>
- 接下来,我们定义一个将通信抽象到设备的
Device
类。 我们从构造函数和析构函数开始:
class Device {
int fd;
public:
Device(const std::string& deviceName) {
fd = open(deviceName.c_str(), O_RDWR);
if (fd < 0) {
throw std::system_error(errno, std::system_category(),
"Failed to open device file");
}
}
~Device() {
close(fd);
}
- 然后,我们添加一个向设备发送数据的方法,如下所示:
void Send(const std::string& data) {
size_t offset = 0;
size_t len = data.size();
while (offset < data.size() - 1) {
int sent = write(fd, data.data() + offset,
data.size() - offset);
if (sent < 0) {
throw std::system_error(errno,
std::system_category(),
"Failed to send data");
}
offset += sent;
}
}
};
- 定义类之后,我们添加
main
函数,该函数使用它:
int main() {
try {
Device serial("/dev/ttyUSB0");
serial.Send("Hello");
} catch (std::system_error& e) {
std::cout << "Error: " << e.what() << std::endl;
std::cout << "Code: " << e.code() << " means \""
<< e.code().message()
<< "\"" << std::endl;
}
return 0;
}
- 最后,我们创建一个
CMakeLists.txt
文件,其中包含程序的构建规则:
cmake_minimum_required(VERSION 3.5.1)
project(except)
add_executable(except except.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
我们的应用与通过串行链路连接的外部设备通信。 在 POSIX 操作系统中,与设备的通信类似于使用常规文件的操作,并且使用相同的 API;即open
、close
、read
和write
函数。
所有这些函数都返回错误代码以指示各种错误情况。 我们没有直接使用它们,而是将通信包装在一个名为Device
的类中。
其构造函数尝试打开由deviceName
构造函数参数引用的文件。 构造函数检查错误代码,如果指示错误,则创建并抛出std::system_error
异常:
throw std::system_error(errno, std::system_category(),
"Failed to open device file");
我们使用三个参数构造std::system_error
实例。 第一个是我们想要包装在异常中的错误代码。 当open
函数返回错误时,我们使用由open
函数设置的errno
变量的值。 第二个参数是错误类别。 因为我们使用特定于操作系统的错误代码,所以我们使用std::system_category
的实例。 第一个参数是我们希望与异常关联的消息。 它可以是任何可以帮助我们识别错误(如果它发生)的东西。
我们以类似的方式定义Send
函数,该函数将数据发送到设备。 它是write
系统函数的包装器,如果write
返回错误,我们将创建并抛出一个std::system_error
实例。 唯一的区别是消息字符串,因为我们希望在日志中区分这两种情况:
throw std::system_error(errno, std::system_category(),
"Failed to send data");
}
在定义了Device
类之后,我们就可以使用它了。 我们不需要打开设备并检查错误,然后写入设备并再次检查错误,只需创建Device
类的实例并向其发送数据:
Device serial("/dev/ttyUSB0");
serial.Send("Hello");
所有错误处理都位于主逻辑之后的catch
块中。 如果抛出系统错误,我们会将其记录到标准输出中。 此外,我们还打印嵌入在异常中的有关错误代码的信息:
} catch (std::system_error& e) {
std::cout << "Error: " << e.what() << std::endl;
std::cout << "Code: " << e.code() << " means \"" << e.code().message()
<< "\"" << std::endl;
}
当我们构建和运行应用时,如果没有作为/dev/ttyUSB0
连接的设备,它将显示以下输出:
正如预期的那样,检测到了错误条件,我们可以看到所有必需的详细信息,包括底层操作系统错误代码及其描述。 请注意,使用包装器类与设备通信的代码整洁且可读。
C++ 标准库附带了许多预定义的异常和错误类别。 有关更多详细信息,请查看https://en.cppreference.com/w/cpp/error上的 C++ 错误处理参考。
C++ 异常为异常处理设计提供了强大的基础。 它们是灵活的,可以以多种不同的方式使用。 您可以引发任何类型的异常,包括指针和整数。 您可以通过值或引用捕获异常。 在选择数据类型时,错误的选择可能会导致性能下降或资源泄漏。
在本食谱中,我们将分析潜在的陷阱,并学习如何在 CATCH 块中使用常量引用来高效、安全地处理错误。
我们将创建一个抛出和捕获自定义异常的示例应用,并分析数据类型选择如何影响效率。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为catch
的子目录。 - 使用您喜欢的文本编辑器在
catch
子目录中创建名为catch.cpp
的文件。 - 将
Error
类的定义放入catch.cpp
文件中:
#include <iostream>
class Error {
int code;
public:
Error(int code): code(code) {
std::cout << " Error instance " << code << " was created"
<< std::endl;
}
Error(const Error& other): code(other.code) {
std::cout << " Error instance " << code << " was cloned"
<< std::endl;
}
~Error() {
std::cout << " Error instance " << code << " was destroyed"
<< std::endl;
}
};
- 接下来,我们添加帮助器函数来测试抛出和处理错误的三种不同方式。 我们从通过值捕获异常的函数开始:
void CatchByValue() {
std::cout << "Catch by value" << std::endl;
try {
throw Error(1);
}
catch (Error e) {
std::cout << " Error caught" << std::endl;
}
}
- 然后,我们添加一个抛出指针并通过指针捕获异常的函数,如下所示:
void CatchByPointer() {
std::cout << "Catch by pointer" << std::endl;
try {
throw new Error(2);
}
catch (Error* e) {
std::cout << " Error caught" << std::endl;
}
}
- 接下来,我们添加一个使用
const
引用捕获异常的函数:
void CatchByReference() {
std::cout << "Catch by reference" << std::endl;
try {
throw Error(3);
}
catch (const Error& e) {
std::cout << " Error caught" << std::endl;
}
}
- 在定义了所有帮助器函数之后,我们添加
main
函数将所有内容联系在一起:
int main() {
CatchByValue();
CatchByPointer();
CatchByReference();
return 0;
}
- 我们将应用的构建规则放入
CMakeLists.txt
文件中:
cmake_minimum_required(VERSION 3.5.1)
project(catch)
add_executable(catch catch.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在我们可以构建和运行应用了。
在我们的应用中,我们定义了一个名为Error
的自定义类,我们将在引发和捕获异常时使用它。 该类提供了一个构造函数、一个复制构造函数和一个仅将信息记录到控制台的析构函数。 我们需要它来评估不同异常捕获方法的效率。
Error
类仅包含code
数据字段,用于区分类的实例:
class Error {
int code;
我们评估了三种异常处理方法。 第一个,CatchByValue
,是最直接的。 我们创建并抛出Error
类的一个实例:
throw Error(1);
然后,我们通过值来捕捉它:
catch (Error e) {
第二个实现CatchByPointer
使用new
运算符动态创建Error
的实例:
throw new Error(2);
我们使用指针来捕获异常:
catch (Error* e) {
最后,CatchByReference
抛出一个类似于CatchByValue
的异常,但它在捕获异常时使用对Error
的const
引用:
catch (const Error& e) {
这有什么不同吗? 当我们运行我们的程序时,我们得到以下输出:
正如您所看到的,当通过值捕获对象时,将创建异常对象的副本。 虽然这种低效在示例应用中并不严重,但在高负载应用中可能会导致性能问题。
通过指针捕获异常并不是低效的,但是我们可以看到对象析构函数没有被调用,从而导致了内存泄漏。 这可以通过从catch
块调用delete
来避免,但这很容易出错,因为并不总是清楚谁负责销毁指针引用的对象。
参考方法是最安全、最有效的方法。 没有内存泄漏和不必要的复制。 此外,设置引用常量会给编译器一个提示,即它不会被更改,因此可以在幕后更好地进行优化。
错误处理是一个复杂的领域,有许多最佳实践、提示和建议。 请考虑阅读https://isocpp.org/wiki/faq/exceptions上的 C++ 异常和错误处理常见问题解答,以掌握您的异常处理技能。
在 C++ 中,如果对象不能正确实例化,对象构造函数就会抛出异常。 通常,这不会导致任何问题。 源自堆栈上构造的对象或使用new
关键字动态创建的对象的异常可以由创建该对象的代码周围的 try-catch 块处理。
不过,对于静态对象来说,情况会变得更加复杂。 这样的对象是在执行进入main
函数之前实例化的,因此它们不能包装在程序的 try-catch 块中。 C++ 编译器通过调用std::terminate
函数来处理这种情况,该函数打印错误消息并终止程序。 即使异常不是致命的,也没有办法恢复。
有几种方法可以避免落入这个陷阱。 一般来说,只应静态分配简单的整型数据类型。 如果您仍然需要一个复杂的静态对象,请确保其构造函数不会引发异常。
在本食谱中,我们将学习如何实现静态对象的构造函数。
我们将创建一个分配指定内存量的自定义类,并静态分配该类的两个实例。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为static
的子目录。 - 使用您喜欢的文本编辑器在
static
子目录中创建名为static.cpp
的文件。 - 让我们定义一个名为
Complex
的类。 将其私有字段和构造函数放入static.cpp
文件中:
#include <iostream>
#include <stdint.h>
class Complex {
char* ptr;
public:
Complex(size_t size) noexcept {
try {
ptr = new(std::nothrow) char[size];
if (ptr) {
std::cout << "Successfully allocated "
<< size << " bytes" << std::endl;
} else {
std::cout << "Failed to allocate "
<< size << " bytes" << std::endl;
}
} catch (...) {
// Do nothing
}
}
- 然后,定义析构函数和
IsValid
方法:
~Complex() {
try {
if (ptr) {
delete[] ptr;
std::cout << "Deallocated memory" << std::endl;
} else {
std::cout << "Memory was not allocated"
<< std::endl;
}
} catch (...) {
// Do nothing
}
}
bool IsValid() const { return nullptr != ptr; }
};
- 定义类之后,我们定义两个全局对象
small
和large
,以及使用它们的main
函数:
Complex small(100);
Complex large(SIZE_MAX);
int main() {
std::cout << "Small object is "
<< (small.IsValid()? "valid" : "invalid")
<< std::endl;
std::cout << "Large object is "
<< (large.IsValid()? "valid" : "invalid")
<< std::endl;
return 0;
}
- 最后,我们创建一个
CMakeLists.txt
文件,其中包含程序的构建规则:
cmake_minimum_required(VERSION 3.5.1)
project(static)
add_executable(static static.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 11")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
在这里,我们定义了Complex
类,并且我们打算静态地分配该类的实例。 为了安全起见,我们需要确保该类的构造函数和析构函数都不能抛出异常。
但是,构造函数和析构函数都会调用可能引发异常的操作。 构造函数执行内存分配,而析构函数将日志写入标准输出。
构造函数使用new
运算符分配内存,如果无法分配内存,则会抛出std::bad_alloc
异常。 我们使用std::nothrow
常量来选择new
的非抛出实现。 如果无法分配任何内存,new
将返回nullptr
,而不是抛出异常:
ptr = new(std::nothrow) char[size];
我们将构造函数的主体包装在try
块中,以捕获所有异常。 catch
块为空-如果构造函数失败,我们将无能为力:
} catch (...) {
// Do nothing
}
由于我们不允许任何异常传播到上层,因此我们使用 C++ 关键字(即noexcept
)将构造函数标记为非抛出:
Complex(size_t size) noexcept {
但是,我们需要知道是否正确创建了对象。 为此,我们定义了一个名为IsValid
的方法。 如果内存已分配,则返回true
,否则返回false
:
bool IsValid() const { return nullptr != ptr; }
析构函数执行相反的操作。 它释放内存,并将释放状态记录到控制台。 至于构造函数,我们不希望任何异常传播到上层,因此我们将析构函数体包装在 try-catch 块中:
try {
if (ptr) {
delete[] ptr;
std::cout << "Deallocated memory" << std::endl;
} else {
std::cout << "Memory was not allocated" << std::endl;
}
} catch (...) {
// Do nothing
}
现在,我们声明两个全局对象small
和large
。 全局对象是静态分配的。 对象的大小是以small
对象将被正确分配的方式人工选择的,但是large
对象的分配应该失败:
Complex small(100);
Complex large(SIZE_MAX);
在我们的main
函数中,我们检查并打印对象是否有效:
std::cout << "Small object is " << (small.IsValid()? "valid" : "invalid")
<< std::endl;
std::cout << "Large object is " << (large.IsValid()? "valid" : "invalid")
<< std::endl;
当我们运行我们的程序时,我们看到以下输出:
正如我们所看到的,小对象被正确地分配和释放。 大型对象的初始化失败,但由于它被设计为不抛出任何异常,所以它没有导致应用的异常终止。 您可以对静态分配的对象使用类似的技术来编写健壮而安全的应用。
嵌入式应用是为在没有监督的情况下工作而构建的。 这包括从错误中恢复的能力。 如果应用崩溃,它可以自动重启。 但是,如果应用因进入死循环或死锁而挂起,我们该怎么办呢?
硬件或软件监视程序用于防止此类情况。 应用应该定期通知或馈送它们,以指示它们继续正常运行。 如果看门狗在特定时间间隔内未被馈送,它将终止应用或重新启动系统。
存在许多不同的监视器实现,但它们的接口本质上是相同的。 它们提供应用可以用来重置看门狗定时器的功能。
在本食谱中,我们将学习如何在 POSIX 信号子系统之上创建一个简单的软件看门狗。 同样的技术可用于硬件看门狗定时器或更复杂的软件看门狗服务。
我们将创建一个定义Watchdog
类的应用,并提供其用法示例。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为watchdog
的子目录。 - 使用您喜欢的文本编辑器在
watchdog
子目录中创建名为watchdog.cpp
的文件。 - 将所需的包含内容放入
watchdog.cpp
文件中:
#include <chrono>
#include <iostream>
#include <thread>
#include <unistd.h>
using namespace std::chrono_literals;
- 接下来,我们定义
Watchdog
类本身:
class Watchdog {
std::chrono::seconds seconds;
public:
Watchdog(std::chrono::seconds seconds):
seconds(seconds) {
feed();
}
~Watchdog() {
alarm(0);
}
void feed() {
alarm(seconds.count());
}
};
- 添加
main
函数,作为我们的 Watchdog 的使用示例:
int main() {
Watchdog watchdog(2s);
std::chrono::milliseconds delay = 700ms;
for (int i = 0; i < 10; i++) {
watchdog.feed();
std::cout << delay.count() << "ms delay" << std::endl;
std::this_thread::sleep_for(delay);
delay += 300ms;
}
}
- 添加包含我们程序的构建规则的
CMakeLists.txt
文件:
cmake_minimum_required(VERSION 3.5.1)
project(watchdog)
add_executable(watchdog watchdog.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 14")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
我们需要一种机制来在应用挂起时终止它。 虽然我们可以产生一个特殊的监视线程或进程,但还有另一种更简单的方法-POSIX 信号。
在 POSIX 操作系统中运行的任何进程都可以接收许多信号。 为了向进程传递信号,操作系统停止进程的正常执行,并调用相应的信号处理程序。
可以传递给进程的信号之一称为alarm
,默认情况下,它的处理程序只是终止应用。 这正是我们实现看门狗所需要的。
我们的Watchdog
类的构造函数接受一个参数seconds
:
Watchdog(std::chrono::seconds seconds):
这是我们的看门狗的时间间隔,它会立即传递到feed
方法以激活看门狗定时器:
feed();
feed
方法调用设置计时器的 POSIX 函数alarm
。 如果已设置计时器,则会使用新值更新计时器:
void feed() {
alarm(seconds.count());
}
最后,我们在析构函数中调用相同的alarm
函数,通过传递值0
来禁用计时器:
alarm(0);
现在,我们每次调用feed
函数时,都会移动进程接收alarm
信号的时间。 但是,如果我们在计时器到期之前没有调用此函数,它将触发alarm
处理程序,从而终止我们的进程。
为了检验这一点,我们创建了一个简单的示例。 这是一个有 10 次迭代的循环。 在每次迭代中,我们显示一条消息并休眠特定的时间间隔。 该间隔最初为 700 毫秒,在每次迭代中增加 300 毫秒;例如,700 毫秒、1,000 毫秒、1,300 毫秒,依此类推:
delay += 300ms;
我们的看门狗设置为 2 秒间隔:
Watchdog watchdog(2s);
让我们运行应用并检查它是如何工作的。 它会生成以下输出:
正如我们所看到的,应用在第六次迭代后终止,在延迟超过看门狗间隔之后。 而且,由于异常终止,其返回码为非零。 如果应用是由另一个应用或脚本生成的,则这是该应用需要重新启动的指示符。
看门狗技术是构建健壮的嵌入式应用的一种简单而高效的方法。
在前面的食谱中,我们了解了如何使用看门狗计时器防止软件挂起。 可以使用类似的技术来实现高可用性系统,该系统由可以执行相同功能的一个或多个软件或硬件组件组成。 如果其中一个组件出现故障,另一个组件可以接管。
当前处于活动状态的组件应该使用称为心跳的消息定期向其他被动组件通告其健康状态。 当它报告不健康状态或在特定时间内没有报告时,无源组件会检测到它并激活它自己。 当故障组件恢复时,它可以转换到被动模式,监视当前主动组件的故障,或者启动回切过程来声明主动状态。
在本食谱中,我们将学习如何在我们的应用中实现简单的心跳监控器。
我们将创建一个定义Watchdog
类的应用,并提供其用法示例。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为heartbeat
的子目录。 - 使用您喜欢的文本编辑器在
heartbeat
子目录中创建名为heartbeat.cpp
的文件。 - 将所需的包含内容放入
heatbeat.cpp
文件中:
#include <chrono>
#include <iostream>
#include <system_error>
#include <thread>
#include <unistd.h>
#include <poll.h>
#include <signal.h>
using namespace std::chrono_literals;
- 接下来,我们定义一个
enum
来报告活动工作人员的健康状态:
enum class Health : uint8_t {
Ok,
Unhealthy,
ShutDown
};
- 现在,让我们创建一个封装心跳报告和监视的类。 我们从类定义、它的私有字段和它的构造函数开始:
class Heartbeat {
int channel[2];
std::chrono::milliseconds delay;
public:
Heartbeat(std::chrono::milliseconds delay):
delay(delay) {
int rv = pipe(channel);
if (rv < 0) {
throw std::system_error(errno,
std::system_category(),
"Failed to open pipe");
}
}
- 接下来,我们添加一个报告健康状态的方法:
void Report(Health status) {
int rv = write(channel[1], &status, sizeof(status));
if (rv < 0) {
throw std::system_error(errno,
std::system_category(),
"Failed to report health status");
}
}
- 紧随其后的是运行状况监视方法:
bool Monitor() {
struct pollfd fds[1];
fds[0].fd = channel[0];
fds[0].events = POLLIN;
bool takeover = true;
bool polling = true;
while(polling) {
fds[0].revents = 0;
int rv = poll(fds, 1, delay.count());
if (rv) {
if (fds[0].revents & (POLLERR | POLLHUP)) {
std::cout << "Polling error occured"
<< std::endl;
takeover = false;
polling = false;
break;
}
Health status;
int count = read(fds[0].fd, &status,
sizeof(status));
if (count < sizeof(status)) {
std::cout << "Failed to read heartbeat data"
<< std::endl;
break;
}
switch(status) {
case Health::Ok:
std::cout << "Active process is healthy"
<< std::endl;
break;
case Health::ShutDown:
std::cout << "Shut down signalled"
<< std::endl;
takeover = false;
polling = false;
break;
default:
std::cout << "Unhealthy status reported"
<< std::endl;
polling = false;
break;
}
} else if (!rv) {
std::cout << "Timeout" << std::endl;
polling = false;
} else {
if (errno != EINTR) {
std::cout << "Error reading heartbeat data, retrying" << std::endl;
}
}
}
return takeover;
}
};
- 定义心跳逻辑后,我们将创建一些函数,以便可以在测试应用中使用它:
void Worker(Heartbeat& hb) {
for (int i = 0; i < 5; i++) {
hb.Report(Health::Ok);
std::cout << "Processing" << std::endl;
std::this_thread::sleep_for(100ms);
}
hb.Report(Health::Unhealthy);
}
int main() {
Heartbeat hb(200ms);
if (fork()) {
if (hb.Monitor()) {
std::cout << "Taking over" << std::endl;
Worker(hb);
}
} else {
Worker(hb);
}
}
- 接下来,我们添加一个
CMakeLists.txt
文件,其中包含程序的构建规则:
cmake_minimum_required(VERSION 3.5.1)
project(heartbeat)
add_executable(heartbeat heartbeat.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 14")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
心跳机制需要某种通信通道来让一个组件向其他组件报告其状态。 在围绕多个处理单元构建的系统中,最佳选择是通过套接字进行基于网络的通信。 我们的应用在单个节点上运行,我们可以改用一种本地 IPC 机制。
我们将使用 POSIX 管道机制来传输心跳。 创建管道时,它提供两个用于通信的文件描述符-一个用于读取数据,另一个用于写入数据。
除了交通运输,我们还需要选择接手的时间间隔。 如果监视进程在此间隔内没有收到心跳消息,它应该将另一个组件视为不健康或出现故障,并执行一些接管操作。
我们首先定义应用可能的运行状况。 我们使用 C++ enum class
严格键入统计信息,如下所示:
enum class Health : uint8_t {
Ok,
Unhealthy,
ShutDown
};
我们的应用很简单,只有三种状态:Ok
、Unhealthy
和ShutDown
。 ShutDown
状态表示活动进程将正常关闭,不需要执行接管操作。
然后,我们定义了Heartbeat
类,它封装了所有消息交换、运行状况报告和监视功能。
它有两个数据字段,分别表示监视时间间隔和用于消息交换的 POSIX 管道:
int channel[2];
std::chrono::milliseconds delay;
构造函数创建管道,并在发生故障时抛出异常:
int rv = pipe(channel);
if (rv < 0) {
throw std::system_error(errno,
std::system_category(),
"Failed to open pipe");
运行状况报告方法是write
函数的简单包装。 它将状态(表示为无符号 8 位整数值)写入管道的write
文件描述符:
int rv = write(channel[1], &status, sizeof(status));
监测方法比较复杂。 它使用 POSIXpoll
函数等待一个或多个文件描述符中的数据。 在我们的示例中,我们只对来自一个文件描述符的数据感兴趣-管道的读取端。 我们用文件描述符和我们感兴趣的事件类型填充pol
使用的fds
结构:
struct pollfd fds[1];
fds[0].fd = channel[0];
fds[0].events = POLLIN | POLLERR | POLLHUP;
两个布尔标志控制轮询循环。 takeover
标志表示退出循环时是否应该执行接管操作,而polling
标志表示循环是否应该存在:
bool takeover = true;
bool polling = true;
在循环的每次迭代中,我们使用poll
函数轮询套接字中的新数据。 我们使用传入构造函数的监视间隔作为轮询超时:
int rv = poll(fds, 1, delay.count());
poll
函数的结果表示以下三种可能结果之一:
- 如果它大于零,我们就有新的数据可以从通信管道中读取。 我们从通信通道读取状态并对其进行分析。
- 如果状态为
Ok
,我们将其记入日志并进行下一次轮询。 - 如果状态为
ShutDown
,我们需要退出轮询循环,但也要防止takeover
操作。 为此,我们相应地设置布尔标志:
case Health::ShutDown:
std::cout << "Shut down signalled"
<< std::endl;
takeover = false;
polling = false;
对于任何其他健康状态,我们将中断循环,并将takeover
标志设置为true
:
std::cout << "Unhealthy status reported"
<< std::endl;
polling = false;
poll
在超时时返回零。 与Unhealthy
状态类似,我们需要中断循环并执行takeover
操作:
} else if (!rv) {
std::cout << "Timeout" << std::endl;
polling = false;
最后,如果poll
返回的值小于零,则表示出错。 系统调用失败的原因有几个,其中一个非常常见的原因是它被信号中断。 这不是真正的错误;我们只需要再次调用poll
。 对于所有其他情况,我们编写日志消息并保持轮询。
监视方法在监视循环运行时阻塞,它返回一个布尔值,让调用者知道是否应该执行接管操作:
bool Monitor() {
现在,让我们尝试在一个玩具示例中使用这个类。 我们将定义一个Worker
函数,该函数接受对Heartbeat
实例的引用,并表示要完成的工作:
void Worker(Heartbeat& hb) {
在内部循环的每次迭代中,Worker
报告其健康状态:
hb.Report(Health::Ok);
在某一时刻,它将其状态报告为Unhealthy
:
hb.Report(Health::Unhealthy);
在main
函数中,我们创建了一个轮询间隔为 200 毫秒的Heartbeat
类的实例:
Heartbeat hb(200ms);
然后,我们产生两个独立的进程。 父进程开始监视,如果需要接管,则运行Worker
方法:
if (hb.Monitor()) {
std::cout << "Taking over" << std::endl;
Worker(hb);
}
子对象只需运行Worker
方法。 让我们运行应用并检查它是如何工作的。 它会生成以下输出:
正如我们所看到的,Worker
方法报告它处理数据,并且监视器检测到它的状态为健康。 但是,在Worker
方法将其状态报告为Unhealthy
之后,监视器会立即检测到它,并再次重新运行工作器以继续处理。 此策略可用于构建更精细的运行状况监视和故障恢复逻辑,以便在您设计和开发的系统中实现高可用性。
在我们的示例中,我们使用了两个相同的组件,它们同时运行并相互监视。 但是,如果其中一个组件包含在特定条件下导致该组件发生故障的软件错误,则另一个相同的组件也很有可能也会出现此问题。 在安全关键型系统中,您可能需要开发两个完全不同的实现。 这种方法增加了成本和开发时间,但提高了系统的可靠性。
嵌入式应用的常见任务之一是与外部物理控件(如按钮或开关)交互。 虽然这样的物体只有两种状态-开和关-但检测按钮或开关改变状态的时刻并不像看起来那么简单。
当按下物理按钮时,需要一段时间才能牢固地建立联系。 在此期间,可能会触发虚假中断,就好像按钮在打开和关闭状态之间跳跃一样。 应用应该能够过滤掉虚假的转换,而不是对每个中断做出反应。 这称为去弹。
虽然它可以在硬件级别实现,但最常见的方法是通过软件来实现。 在本食谱中,我们将学习如何实现一个简单而通用的去抖动函数,该函数可以与任何类型的输入一起使用。
我们将创建一个应用,该应用定义一个带有测试输入的通用去抖动函数。 通过将测试输入替换为实际输入,此功能可用于任何实际目的。 遵循以下步骤:
- 在您的工作目录(即
~/test
)中,创建一个名为debounce
的子目录。 - 使用您喜欢的文本编辑器在
debounce
子目录中创建名为debounce.cpp
的文件。 - 让我们将 Includes 和一个名为
debounce
的函数添加到debounce.cpp
文件:
#include <iostream>
#include <chrono>
#include <thread>
using namespace std::chrono_literals;
bool debounce(std::chrono::milliseconds timeout, bool (*handler)(void)) {
bool prev = handler();
auto ts = std::chrono::steady_clock::now();
while (true) {
std::this_thread::sleep_for(1ms);
bool value = handler();
auto now = std::chrono::steady_clock::now();
if (value == prev) {
if (now - ts > timeout) {
break;
}
} else {
prev = value;
ts = now;
}
}
return prev;
}
- 然后,我们添加
main
函数,该函数显示如何使用它:
int main() {
bool result = debounce(10ms, []() {
return true;
});
std::cout << "Result: " << result << std::endl;
}
- 添加包含我们程序的构建规则的
CMakeLists.txt
文件:
cmake_minimum_required(VERSION 3.5.1)
project(debounce)
add_executable(debounce debounce.cpp)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)
SET(CMAKE_CXX_FLAGS "--std=c++ 14")
set(CMAKE_CXX_COMPILER /usr/bin/arm-linux-gnueabi-g++)
- 现在您可以构建和运行应用了。
我们的目标是检测按钮何时在打开和关闭状态之间停止跳动。 我们假设,如果在特定时间间隔内所有连续读取按钮状态的尝试都返回相同的值(打开或关闭),我们就可以判断按钮是真正打开还是关闭。
我们使用此逻辑来实现debounce
函数。 由于我们希望使去抖动逻辑尽可能通用,因此该函数不应该知道如何读取按钮的状态。 这就是该函数接受两个参数的原因:
bool debounce(std::chrono::milliseconds timeout, bool (*handler)(void)) {
第一个参数timeout
定义了报告状态更改需要等待的特定时间间隔。 第二个参数handler
是一个函数或类似函数的对象,它知道如何读取按钮的状态。 它被定义为指向不带参数的布尔函数的指针。
函数的作用是:运行一个循环。 在每次迭代中,它调用处理程序来读取按钮的状态,并将其与前一个值进行比较。 如果值相等,则检查自最近一次状态更改以来的时间。 如果超过超时,我们将退出循环并返回:
auto now = std::chrono::steady_clock::now();
if (value == prev) {
if (now - ts > timeout) {
break;
}
如果值不相等,我们将重置最近状态更改的时间并继续等待:
} else {
prev = value;
ts = now;
}
为了最小化 CPU 负载并让其他进程执行一些工作,我们在两次读取之间添加了 1 毫秒的延迟。 如果该功能打算在不运行多任务操作系统的微控制器上使用,则不需要此延迟:
std::this_thread::sleep_for(1ms);
我们的main
函数包含一个debounce
函数的用法示例。 我们使用 C++ lambda 定义读取按钮的简单规则。 它始终返回true
:
bool result = debounce(10ms, []() {
return true;
});
我们将10ms
作为debounce
超时传递。 如果我们运行我们的程序,我们将看到以下输出:
debounce
函数工作 10 毫秒并返回true
,因为测试输入中没有虚假状态变化。 在实际输入的情况下,按钮状态可能需要更多时间才能稳定下来。 这种简单而有效的去抖动函数可以应用于各种实际输入。