diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..30555a59 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Ignore build directories +BUILD/ +build/ + +runFile/ +# Ignore log files +*.log + +# Ignore object files and executables +*.o +*.a +*.so +*.exe +*.sh +# Ignore temporary files +*.tmp +*.swp +*.swo + +# Ignore IDE-specific files +.vscode/ +*.sublime-workspace +*.sublime-project + +# Ignore system-specific files +.DS_Store +Thumbs.db + +user_1.cpp +user_2.cpp +user_3.cpp +main_bus.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 00000000..9a6058a8 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,80 @@ +cmake_minimum_required(VERSION 3.10) +project(basicApi) + +# FetchContent for GoogleTest and GoogleMock +include(FetchContent) +FetchContent_Declare( + googletest + GIT_REPOSITORY https://github.com/google/googletest.git + GIT_TAG release-1.11.0 +) +FetchContent_MakeAvailable(googletest) + +FetchContent_Declare( + googlemock + GIT_REPOSITORY https://github.com/google/googlemock.git + GIT_TAG release-1.11.0 +) +FetchContent_MakeAvailable(googlemock) + +# Set C++ standard +set(CMAKE_CXX_STANDARD 17) + +# Add source files +set(SOURCES + src/Packet.cpp + src/communication.cpp + test/testCommunication.cpp +) + +# Include directories +include_directories(${gtest_SOURCE_DIR}/include ${gmock_SOURCE_DIR}/include ${CMAKE_SOURCE_DIR}/logger) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/src) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/sockets) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/test) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../googletest/googletest/include) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../googletest/googlemock/include) + +# Add the logger source files +set(LOGGER_SOURCES + ${CMAKE_SOURCE_DIR}/logger/logger.cpp +) + +# Add the test source files +set(TEST_SOURCES + ${CMAKE_SOURCE_DIR}/test/loggerTest.cpp +) + +# Find and link pthread +find_package(Threads REQUIRED) + +# Add GTest and GMock +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/../googletest googletest-build) + +# Define LOG_LEVEL for compilation (default: INFO) +set(DEFAULT_LOG_LEVEL "logger::LogLevel::INFO") +set(LOG_LEVEL ${DEFAULT_LOG_LEVEL} CACHE STRING "Set the logging level (e.g., logger::LogLevel::ERROR, logger::LogLevel::INFO, logger::LogLevel::DEBUG)") +add_definitions(-DLOG_LEVEL=${LOG_LEVEL}) + +# Add the test executable +add_executable(testCommunication ${SOURCES}) + +# Create the test executable +add_executable(LoggerTests ${LOGGER_SOURCES} ${TEST_SOURCES}) + +# Link the GTest and GMock libraries +target_link_libraries(LoggerTests + gtest + gmock + pthread +) +target_link_libraries(testCommunication gtest gmock gtest_main gmock_main Threads::Threads) + +# Enable testing +enable_testing() + +# Ensure the test executable is built +add_test(NAME LoggerTests COMMAND LoggerTests) + +# Add tests +add_test(NAME testCommunication COMMAND testCommunication) diff --git a/communication/include/bus_manager.h b/communication/include/bus_manager.h new file mode 100644 index 00000000..a0e19194 --- /dev/null +++ b/communication/include/bus_manager.h @@ -0,0 +1,55 @@ +#pragma once +#include +#include +#include +#include "../include/server_connection.h" +#include +#include "packet.h" +#include "global_clock.h" + +// Manager class responsible for handling CAN BUS-like communication and collision management +class BusManager { + public: + // Returns the singleton instance of Manager + static BusManager *getInstance(std::vector idShouldConnect, + uint32_t limit); + + // Starts server connection + ErrorCode startConnection(); + + // Stops server connection + static void stopConnection(); + + // Receives a packet and checks for collisions before sending + void receiveData(Packet &p); + + // Destructor for cleaning up + ~BusManager(); + private: + ServerConnection server; // Handles communication with the server + static BusManager *instance; // Singleton instance + static std::mutex managerMutex; // Mutex for singleton + Packet *lastPacket; // Stores the last packet received + std::mutex lastPacketMutex; // Protects access to lastPacket + std::atomic stopFlag; // Indicates if the collision timer should stop + std::thread collisionTimerThread; // Thread for collision management + + // Private constructor to ensure singleton pattern + BusManager(std::vector idShouldConnect, uint32_t limit); + + // Sends packet to clients based on whether it's broadcast or unicast + ErrorCode sendToClients(const Packet &packet); + + // Starts the timer to check for packet collisions + void startCollisionTimer(); + + // Checks if the current packet collides with the last one + void checkCollision(Packet ¤tPacket); + + // Determines packet priority in case of collision, based on CAN BUS protocol + Packet *packetPriority(Packet &a, Packet &b); + + // Sends the last packet if necessary and clears it + void checkLastPacket(); + +}; \ No newline at end of file diff --git a/communication/include/client_connection.h b/communication/include/client_connection.h new file mode 100644 index 00000000..8a95eaea --- /dev/null +++ b/communication/include/client_connection.h @@ -0,0 +1,62 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include "message.h" +#include "../sockets/Isocket.h" +#include "../sockets/mock_socket.h" +#include "../sockets/real_socket.h" +#include +#include "error_code.h" +#include "../include/global_clock.h" + +#define PORT 8080 +#define IP "127.0.0.1" + +class ClientConnection +{ +private: + int clientSocket; + sockaddr_in servAddress; + std::atomic connected; + std::function passPacketCom; + ISocket* socketInterface; + std::thread receiveThread; + +public: + // Constructor + ClientConnection(std::function callback, ISocket* socketInterface = new RealSocket()); + + // Requesting a connection to the server + ErrorCode connectToServer(int id); + + // Sends the packet to the manager-sync + ErrorCode sendPacket(Packet &packet); + + // Waits for a message and forwards it to Communication + void receivePacket(); + + // Closes the connection + ErrorCode closeConnection(); + + // Setter for passPacketCom + void setCallback(std::function callback); + + // Setter for socketInterface + void setSocketInterface(ISocket* socketInterface); + + // For testing + int getClientSocket(); + + int isConnected(); + + bool isReceiveThreadRunning(); + + //Destructor + ~ClientConnection(); +}; + diff --git a/communication/include/communication.h b/communication/include/communication.h new file mode 100644 index 00000000..20c240d4 --- /dev/null +++ b/communication/include/communication.h @@ -0,0 +1,407 @@ +#ifndef __COMMUNICATION_H__ +#define __COMMUNICATION_H__ + +#include +#include +#include +#include +#include +#include +#include "client_connection.h" +#include "error_code.h" +#include "scheduler.h" +#include "message.h" +#include "packet.h" + +#define THRESHOLD_TO_BUSOFF 256 ///< Threshold for entering BusOff state +#define THRESHOLD_TO_PASSIVE 128 ///< Threshold for entering Passive state +#define BUSOFF_RECOVERY_TIME (128 * TICK_DURATION) ///< Time required to recover from BusOff state +#define MAX_RETRANSMISSIONS 10 ///< Maximum allowed retransmissions for a packet +#define MAX_SIMULTANEOUS_MESSAGES 5 ///< Maximum number of messages allowed to be sent simultaneously + +// Enumeration for different error states of the communication +enum class ErrorState { + Active, ///< The communication system is operating normally. + Passive, ///< The communication system has issues and is losing priority during collisions. + BusOff ///< The communication system is in BusOff state and cannot send or receive messages. +}; + +// Communication class to handle message transmission and reception +class Communication { + private: + ClientConnection client; ///< ClientConnection object for managing the connection + std::unordered_map receivedMessages; ///< Map to store received messages + void (*passData)(uint32_t, void *); ///< Callback function for passing data to the client + uint32_t id; ///< Unique identifier for the communication instance + static Communication *instance; ///< Static variable that holds an instance of the class + + // To limit simultaneous messages in asynchronous send + static std::vector> activeMessageFutures; ///< Vector to hold active message futures + static std::mutex messageThreadsMutex; ///< Mutex to synchronize access to the active thread vector + std::condition_variable messageThreadsCondition; ///< Condition variable to wait for available thread slots + + // For ensuring that only one send per tick occurs + std::atomic lastSendTick; ///< The last tick when a packet was sent + std::mutex sendMutex; ///< Mutex for synchronizing send operations + + // Error tracking variables + std::mutex mtx; ///< Mutex to protect access to error counts and state + int TEC; ///< Transmission Error Count + int REC; ///< Receive Error Count + ErrorState state; ///< Current system state + + static std::thread busOffRecoveryThread; ///< Thread for handling BusOff recovery + + // RTO (Retransmission Timeout) handling + Scheduler scheduler; ///< Scheduler for managing transmission timing + + // Packet reception and validation methods + + /** + * @brief Receives a packet and processes it based on its type. + * + * This method checks if the bus is in the BusOff state and exits if so. + * It then verifies the destination ID of the packet. + * If valid, it performs a CRC check for data integrity. + * If the CRC check fails, an error message is sent to the source. + * + * Upon successful validation, the method determines the packet type: + * - For data messages, it calls `handleDataPacket`. + * - For error messages, it calls `handleErrorPacket`. + * - For acknowledgment messages, it calls `handleAckPacket`. + * + * @param packet The packet object to be received and processed. + */ + void receivePacket(Packet &packet); + + /** + * @brief Checks the destination ID of the packet. + * + * This method determines whether the packet is intended for this communication node. + * It returns true if the packet is either a broadcast or if its destination ID matches + * the ID of the current communication node. + * + * @param packet The packet object containing destination information. + * @return Returns true if the packet is a broadcast or if the destination ID matches the node's ID. + */ + bool checkDestId(Packet &packet); + + /** + * @brief Validates the CRC of the packet. + * + * This method checks the integrity of the packet by validating its CRC (Cyclic Redundancy Check). + * It calls the packet's own method to perform the CRC validation. + * + * @param packet The packet object whose CRC is to be validated. + * @return Returns true if the CRC is valid, false otherwise. + */ + bool validCRC(Packet &packet); + + /** + * @brief Handles incoming data packets. + * + * This method processes data packets by adding them to the message queue. + * After processing the packet, it sends an acknowledgment (ACK) back to the source. + * + * @param packet The data packet to be handled. + */ + void handleDataPacket(Packet &packet); + + /** + * @brief Handles incoming error packets. + * + * This method processes error packets based on their destination ID. + * If the packet is intended for this node, it handles the transmission error. + * Otherwise, it handles the reception error. + * + * @param packet The error packet to be handled. + */ + void handleErrorPacket(Packet &packet); + + /** + * @brief Handles acknowledgment packets. + * + * This method processes acknowledgment packets by retrieving the + * received message ID from the packet's payload. It then notifies + * the scheduler of the received acknowledgment and handles the + * successful transmission. + * + * @param packet The acknowledgment packet to be handled. + */ + void handleAckPacket(Packet &packet); + + /** + * @brief Adds a received packet to the corresponding message. + * + * This method checks if a message with the given ID already exists + * in the received messages. If it does, the packet is added to that + * message. If not, a new message is created. When the message is + * complete, the complete data is passed to the next stage of + * processing, and the message is removed from the received messages. + * + * @param packet The packet to be added to the message. + */ + void addPacketToMessage(Packet &packet); + + // BusOff recovery methods + + /** + * @brief Initiates the recovery process from the BusOff state. + * + * This method checks if the communication system is in the BusOff state + * and if the bus off recovery thread is not already running. If both conditions + * are met, it starts a new thread to handle the recovery process via the + * `busOffTimer` method. + */ + void recoverFromBusOff(); + + /** + * @brief Handles the timer for recovering from the BusOff state. + * + * This method puts the thread to sleep for a defined recovery time + * (BUSOFF_RECOVERY_TIME) and then resets the state of the communication system. + */ + void busOffTimer(); + + /** + * @brief Cleans up the BusOff recovery thread. + * + * This method checks if the BusOff recovery thread is joinable. + * If it is, the thread is joined to ensure proper cleanup and resource management. + */ + static void cleanupBusOffRecoveryThread(); + + // State checking and error handling methods + + /** + * @brief Checks and updates the communication state based on error counters. + * + * This method evaluates the Transmission Error Counter (TEC) and Receive Error Counter (REC) to determine + * the current state of the communication system. If TEC exceeds the threshold for BusOff, the state is + * set to BusOff. If TEC or REC exceeds the threshold for Passive, the state is set to Passive. + * Otherwise, the state is set to Active. If the state is BusOff, the recovery process is initiated. + */ + void checkState(); + + /** + * @brief Handles the transmission error by updating the Transmission Error Counter (TEC). + * + * This method increments the Transmission Error Counter (TEC) by a defined value (8) + * to reflect a transmission error. It then calls `checkState` to reassess the + * communication state based on the updated error counter. + */ + void handleTransmitError(); + + /** + * @brief Handles acknowledgment errors by updating the Transmission Error Counter (TEC). + * + * This method increments the Transmission Error Counter (TEC) by 1 to reflect + * an acknowledgment error. It then calls `checkState` to reassess the communication + * state based on the updated error counter. + * + * @param packet The packet object associated with the acknowledgment error. + */ + void handleACKError(); + + /** + * @brief Handles reception errors by updating the Reception Error Counter (REC). + * + * This method increments the Reception Error Counter (REC) by 1 to reflect + * a reception error. It then calls `checkState` to reassess the communication + * state based on the updated error counter. + */ + void handleReceiveError(); + + /** + * @brief Handles successful transmissions by updating the Transmit Error Counter (TEC). + * + * This method decrements the Transmit Error Counter (TEC) by 1 to reflect + * a successful transmission, as long as the counter is greater than zero. + * It then calls `checkState` to reassess the communication state based on + * the updated error counter. + */ + void handleSuccessfulTransmission(); + + /** + * @brief Resets the communication state by clearing error counters and setting state to active. + * + * This method resets the Transmit Error Counter (TEC) and Receive Error Counter (REC) + * to zero, and sets the communication state to `Active`. This function ensures that the + * communication system starts fresh after experiencing issues, allowing normal operation + * to resume. + */ + void resetState(); + + //Management of packet transmission according to clock ticks. + + /** + * @brief Ensures that only one message is sent per clock tick. + * + * This method checks the current clock tick and compares it with the last tick + * when a message was sent. If the current tick is different, it updates the + * last sent tick. If it is the same, the function waits for the next tick + * before proceeding. + * + * The method uses a mutex to ensure thread safety during the tick check and + * update operations. + */ + void ensureSingleSendPerTick(); + + /** + * @brief Sends a packet with a retransmission timeout (RTO). + * + * This method attempts to send a packet, checking the system state and the + * number of retransmissions. If the system is in the BusOff state or the + * maximum retransmissions have been exceeded, it stops sending the packet. + * + * It ensures that only one packet is sent per tick and sets the packet's + * passive state based on the current system state. It waits for an acknowledgment + * (ACK) and handles retransmissions if needed. + * + * @param packet The packet object to be sent. + * @param retransmissions The number of retransmissions attempted (default is 0). + * @return True if the packet was sent successfully, false otherwise. + */ + bool sendPacketWithRTO(Packet &packet, int retransmissions = 0); + + /** + * @brief Sends a packet to the designated recipient. + * + * This method checks if the system is in the BusOff state. If it is, the function + * immediately returns false. It ensures that only one packet is sent per tick + * and sets the packet's passive state based on the current system state. + * + * The method attempts to send the packet using the client and returns true if + * the transmission is successful. + * + * @param packet The packet object to be sent. + * @return True if the packet was sent successfully, false otherwise. + */ + bool sendPacket(Packet &packet); + + /** + * @brief Waits for all active message threads to complete. + * + * This function manages the synchronization of active message threads in a thread-safe manner. + * It locks the mutex to ensure thread safety while iterating through the futures of active message threads. + * Each valid future is waited upon to ensure that all active messages have been processed before clearing the vector. + */ + static void waitForActiveMessages(); + + /** + * @brief Handles signals for the communication instance. + * + * This function is called when a signal is received. It performs cleanup operations, + * including closing the client connection and waiting for active message threads to complete. + * It exits the program with the provided signal number. + * + * @param signum The signal number that triggered this handler. + */ + static void signalHandler(int signum); + + /** + * @brief Sets the ID for the communication instance. + * + * This function updates the ID used for communication purposes. + * + * @param newId The new ID to be set for the instance. + */ + void setId(uint32_t newId); + + /** + * @brief Sets the callback function for passing data. + * + * This function allows setting a custom callback function that will be called to pass data. + * + * @param callback A pointer to the function that will handle data passing. + */ + void setPassDataCallback(void (*callback)(uint32_t, void *)); + + public: + //Cconstructor and destructor + /** + * @brief Constructs a Communication object. + * + * This constructor initializes the communication instance with the given ID and + * sets the callback for passing data. It also sets a signal handler for SIGINT + * and initializes the error counters and state. + * + * @param id The unique identifier for the communication instance. + * @param passDataCallback The callback function to pass data. + */ + Communication(uint32_t id, void (*passDataCallback)(uint32_t, void *)); + /** + * @brief Destroys the Communication object. + * + * This destructor performs cleanup operations, including waiting for active + * message threads and cleaning up any recovery threads. + */ + ~Communication(); + + //connection management + + /** + * @brief Starts the connection to the server. + * + * This function attempts to establish a connection with the server using the + * communication instance's ID. + * + * @return ErrorCode indicating the result of the operation. + */ + ErrorCode startConnection(); + + //Sending messages + + /** + * @brief Sends a message synchronously. + * + * This function sends a message to a specified destination ID with the provided + * data and size. It validates input parameters before sending and returns an + * error code based on the result of the sending operation. + * + * @param data Pointer to the data to send. + * @param dataSize Size of the data in bytes. + * @param destID The destination ID for the message. + * @param srcID The source ID of the message. + * @param messageType The type of message being sent (default is DATA_MESSAGE). + * @return ErrorCode indicating the result of the operation. + */ + ErrorCode sendMessage(void *data, size_t dataSize, uint32_t destID, + uint32_t srcID, + MessageType messageType = MessageType::DATA_MESSAGE); + + /** + * @brief Sends a message asynchronously. + * + * This function sends a message to a specified destination ID asynchronously, + * allowing the calling thread to continue without blocking. It accepts a callback + * function that will be called with the result of the sending operation. + * + * @param data Pointer to the data to send. + * @param dataSize Size of the data in bytes. + * @param destID The destination ID for the message. + * @param srcID The source ID of the message. + * @param sendCallback The callback function to call with the result of the send operation. + * @param messageType The type of message being sent. + */ + void sendMessageAsync(void *data, size_t dataSize, uint32_t destID, + uint32_t srcID, + std::function sendCallback, + MessageType messageType = MessageType::DATA_MESSAGE); + + /** + * @brief Retrieves the current state of the communication system. + * + * This method returns the current communication state, which may be one of the following: + * - Active + * - Passive + * - BusOff + * + * The function locks the mutex to ensure thread safety while accessing the state. + * + * @return ErrorState The current communication state. + */ + ErrorState getState(); +}; + +#endif // __COMMUNICATION_H__ diff --git a/communication/include/global_clock.h b/communication/include/global_clock.h new file mode 100644 index 00000000..34a1054c --- /dev/null +++ b/communication/include/global_clock.h @@ -0,0 +1,133 @@ +#ifndef __GLOBAL_CLOCK_H__ +#define __GLOBAL_CLOCK_H__ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../sockets/real_socket.h" + +#define SHM_NAME "/clock__shm" +#define TICK_DURATION std::chrono::milliseconds(50) // Define the tick duration +#define MAX_TICK std::numeric_limits::max() // Define the maximum value for the tick + +/** + * @struct SharedClock + * @brief A structure representing the shared state of the global clock. + * + * This structure is stored in shared memory, allowing multiple processes to access and modify + * the clock state. It contains: + * - `current_tick`: The current value of the clock tick. + * - `is_running`: A boolean flag indicating whether the clock is running. + * - `tick_mutex`: A mutex used to protect access to the tick count. + * - `tick_cond`: A condition variable used to notify waiting processes when a tick occurs. + */ +struct SharedClock { + std::atomic current_tick; ///< The current tick value. + std::atomic is_running; ///< Whether the clock is currently running. + pthread_mutex_t tick_mutex; ///< Mutex to synchronize access to the tick count. + pthread_cond_t tick_cond; ///< Condition variable to notify waiting threads on tick change. +}; +/** + * @class GlobalClock + * @brief A class that manages a global clock accessible across multiple processes using shared memory. + * + * The `GlobalClock` class is responsible for maintaining a synchronized global clock that increments + * ticks at regular intervals. This clock can be shared across multiple processes using shared memory, + * and synchronization is achieved using mutexes and condition variables to ensure that processes can + * wait for the next tick and query the current tick in a thread-safe manner. + * + * The clock can be started and stopped, and it ensures proper cleanup of shared resources when the + * process is terminated. + */ +class GlobalClock { + public: + /** + * @brief Starts the global clock. + * + * Initializes shared memory if not already initialized, sets the clock to running, + * and begins ticking at regular intervals in a separate thread. Only one process can start the clock. + */ + static void startClock(); + /** + * @brief Waits for the next tick. + * + * Blocks the calling thread until the next clock tick occurs, allowing synchronization + * with the global clock. Uses condition variables to wait for the clock tick to change. + */ + static void waitForNextTick(); + /** + * @brief Retrieves the current tick value. + * + * @return The current value of the clock tick, as an integer. + * + * Ensures the shared memory is initialized, and returns the current tick count stored in + * the shared memory. + */ + static int getCurrentTick(); + /** + * @brief Stops the global clock. + * + * Sets the `is_running` flag in the shared memory to `false`, stopping the clock and + * ensuring that the clock thread is properly joined before exiting. + */ + static void stopClock(); + + private: + /** + * @brief Initializes shared memory for the global clock if not already initialized for this process. + * + * Allocates shared memory and initializes the `SharedClock` structure if necessary, which contains the + * current tick, running state, mutex, and condition variable. + */ + static void initializeSharedMemory(); + /** + * @brief .Initializes the mutex and condition variables for shared clock synchronization. + * + * This function is only called by the first process that starts the clock. + * It sets the mutex and condition variable to be shared between processes + * using `PTHREAD_PROCESS_SHARED`. + */ + static void initializeClockSynchronization(); + /** + * @brief Releases process-specific resources. + * + * Unmaps the shared memory and closes the file descriptor associated with it. + * This method ensures that the process-specific resources are properly cleaned up. + */ + static void releaseProcessResources(); + /** + * @brief Releases system-wide shared resources. + * + * Unlinks the shared memory, effectively deleting it from the system, and closes the file + * descriptor. This should be called when no other process requires access to the clock. + */ + static void releaseSharedSystemResources(); + /** + * @brief Registers a cleanup routine. + * + * Registers a cleanup routine using `atexit()` to ensure that when the process exits, it + * properly releases its resources, whether those are process-specific or system-wide. + */ + static void registerCleanup(); + /** + * @brief The main function to run the clock. + * + * Increments the tick count in regular intervals (defined by `TICK_DURATION`) while the + * clock is running. Broadcasts the condition variable on each tick to wake up waiting processes. + */ + static void runClock(); + // Shared memory for the clock state, accessible across processes. + static SharedClock *shared_clock; + // File descriptor for the shared memory. + static int shared_memory_fd; + // Thread to manage the clock in the background. + static std::thread clock_thread; +}; +#endif // __GLOBAL_CLOCK_H__ \ No newline at end of file diff --git a/communication/include/message.h b/communication/include/message.h new file mode 100644 index 00000000..d3016794 --- /dev/null +++ b/communication/include/message.h @@ -0,0 +1,64 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include "packet.h" + +enum MessageType { + INITIAL_CONNECTION, // Represents initial connection of a component to the server + ERROR_MESSAGE, // Error message (highest priority) + ACK, // Acknowledgment message + OVERLOAD_MESSAGE, // Indicates network overload + REMOTE_TRANSMISSION_REQUEST, // Request for remote data (RTR) + DATA_MESSAGE // Regular data message (lowest priority) +}; + + + +class Message { +private: + std::vector packets; + uint32_t tps; + uint32_t messageID; + + uint32_t generateMessageID(MessageType messageType, uint16_t srcID); + +public: + // Default constructor + Message() = default; + + // Constructor for sending message + Message(uint32_t srcID, uint32_t destID, void* data, size_t size, MessageType messageType); + + // Constructor for receiving message + Message(uint32_t tps); + + // Add a packet to the received message + bool addPacket(const Packet &p); + + // Check if the message is complete + bool isComplete() const; + + // Get the complete data of the message + void* completeData() const; + + // Get the packets of the message + std::vector& getPackets(); + + // Extract the MessageType from the message ID + static MessageType getMessageTypeFromID(uint32_t messageID); + + // Returns a string representation of the MessageType enum value + static std::string getMessageTypeString(MessageType type); + + // Get the message ID + uint32_t getMessageID() const + { + return messageID; + } +}; \ No newline at end of file diff --git a/communication/include/packet.h b/communication/include/packet.h new file mode 100644 index 00000000..d8ce6120 --- /dev/null +++ b/communication/include/packet.h @@ -0,0 +1,74 @@ +#ifndef __PACKET_H__ +#define __PACKET_H__ + +#include +#include +#include +#include + +#define SIZE_PACKET 8 + +class Packet { +public: + // Default constructor for Packet. + Packet() = default; + + // Constructor for sending message + Packet(uint32_t id, uint32_t PSN, uint32_t TPS, uint32_t srcId, + uint32_t destId, int DLC, bool RTR, bool isBroadcast, + const uint8_t *payload); + + // Constructor for receiving message + Packet(uint32_t id); + + // Destructor + ~Packet() = default; + + // Overloaded operator> + bool operator>(const Packet &other) const; + + // Getters for accessing Header fields + uint32_t getId() const { return header.id; } + uint32_t getPSN() const { return header.PSN; } + uint32_t getTPS() const { return header.TPS; } + uint32_t getSrcId() const { return header.srcId; } + uint32_t getDestId() const { return header.destId; } + int getTimestamp() const { return header.timestamp; } + int getDLC() const { return header.DLC; } + uint16_t getCRC() const { return header.CRC; } + bool isRTR() const { return header.RTR; } + bool getIsPassive() const { return header.passive; } + bool getIsBroadcast() const { return header.isBroadcast; } + + const uint8_t *getPayload() const { return payload; } + + // Setters for modifying Header fields + void setIsPassive(bool p) { header.passive = p; } + void setTimestamp(int t) { header.timestamp = t; } + + // CRC calculation + uint16_t calculateCRC() const; + bool validateCRC() const; + + // Static function to convert raw data to hexadecimal + static std::string pointerToHex(const void* data, size_t size); +private: + // Header structure within Packet + struct Header { + uint32_t id; // Unique identifier for the message (4 bytes) + uint32_t PSN; // Packet Sequence Number (4 bytes) + uint32_t TPS; // Total Packet Sum (4 bytes) + uint32_t srcId; // Source node ID (4 bytes) + uint32_t destId; // Destination node ID (4 bytes) + int timestamp; // Timestamp marking the packet's send time (4 bytes) + int DLC; // Data Length Code (0-8 bytes) (4 bytes) + uint16_t CRC; // Cyclic Redundancy Check (2 bytes) + bool RTR; // Remote Transmission Request flag (1 byte) + bool passive; // Passive state flag (1 byte) + bool isBroadcast; // Broadcast flag (1 byte) + } header; + + uint8_t payload[SIZE_PACKET]; // Data payload (fixed size, up to SIZE_PACKET bytes) +}; + +#endif // __PACKET_H__ \ No newline at end of file diff --git a/communication/include/scheduler.h b/communication/include/scheduler.h new file mode 100644 index 00000000..9b1c72b5 --- /dev/null +++ b/communication/include/scheduler.h @@ -0,0 +1,81 @@ +#ifndef __SCHEDULER_H__ +#define __SCHEDULER_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "global_clock.h" +#include "../sockets/real_socket.h" +// must be set +#define MAX_ACK_TIMEOUT \ + (20 * TICK_DURATION) ///< Maximum time to wait for ACK + +/** + * @class Scheduler + * @brief Manages retransmission timers and tracks acknowledgments for packets. + * + * This class provides functionality to manage retransmission timers for + * packets, handle acknowledgments, and clean up packet data once + * retransmissions are complete. + */ +class Scheduler +{ +public: + using Callback = std::function; + + Scheduler(); + ~Scheduler(); + + /** + * @brief Stops all active timers and waits for their completion. + * + * This method ensures that all active timers (threads) complete their + * execution before the program exits. + */ + void stopAllTimers(); + + /** + * @brief Starts a retransmission timer for a given packet ID. + * + * The function initiates a timer to wait for an ACK (Acknowledgment) for the specified packet. + * If the ACK is received within the MAX_ACK_TIMEOUT, it clears the packet data and sets the result + * of the ackPromise to `true`, indicating success. If the timeout occurs and no ACK is received, + * it triggers the provided callback function to retransmit the packet and increments the retry count. + * + * @param packetID The unique ID of the packet being transmitted. + * @param callback The callback function to call when retransmitting the packet after a timeout. + * @param ackPromise A shared promise that communicates whether the packet transmission was successful + * (i.e., ACK was received). + */ + void startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr> ackPromise); + + /** + * @brief Receives an acknowledgment for a packet. + * + * @param packetID The ID of the packet that has been acknowledged. + */ + void receiveACK(int packetID); + + /** + * @brief Clears the data associated with a packet. + * + * @param packetID The ID of the packet whose data is to be cleared. + */ + void clearPacketData(int packetID); + +private: + std::unordered_map + ackReceived; ///< Map to track received acknowledgments + std::unordered_map + retryCounts; ///< Map to track retry counts for packets + std::mutex mutex; ///< Mutex for thread safety + std::condition_variable cv; ///< Condition variable for synchronization + std::vector> + futures; ///< Vector to store futures of active threads +}; + +#endif // __SCHEDULER_H__ diff --git a/communication/include/server_connection.h b/communication/include/server_connection.h new file mode 100644 index 00000000..0b8ba802 --- /dev/null +++ b/communication/include/server_connection.h @@ -0,0 +1,90 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "message.h" +#include "../sockets/Isocket.h" +#include "../sockets/real_socket.h" +#include "error_code.h" + +class ServerConnection +{ +private: + int serverSocket; + sockaddr_in address; + int port; + std::atomic running; + std::thread mainThread; + std::vector clientThreads; + std::vector sockets; + std::mutex socketMutex; + std::mutex threadMutex; + std::function receiveDataCallback; + std::map clientIDMap; + std::mutex IDMapMutex; + ISocket* socketInterface; + + // Starts listening for connection requests + void startThread(); + + // Implementation according to the CAN BUS + bool isValidId(uint32_t id); + + // Runs in a thread for each process - waits for a message and forwards it to the manager + void handleClient(int clientSocket); + + // Returns the sockets ID + int getClientSocketByID(uint32_t destID); + +public: + + // Constructor + ServerConnection(int port, std::function callback, ISocket* socketInterface = new RealSocket()); + + // Initializes the listening socket + ErrorCode startConnection(); + + // Closes the sockets and the threads + void stopServer(); + + // Sends the message to all connected processes - broadcast + ErrorCode sendBroadcast(const Packet &packet); + + // Sets the server's port number, throws an exception if the port is invalid. + void setPort(int port); + + // Sets the callback for receiving data, throws an exception if the callback is null. + void setReceiveDataCallback(std::function callback); + + // Sets the socket interface, throws an exception if the socketInterface is null. + void setSocketInterface(ISocket *socketInterface); + + // Sends the message to destination + ErrorCode sendDestination(const Packet &packet); + + // For testing + int getServerSocket(); + + int isRunning(); + + std::vector* getSockets(); + + std::mutex* getSocketMutex(); + + std::mutex* getIDMapMutex(); + + std::map* getClientIDMap(); + + void testHandleClient(int clientSocket); + + int testGetClientSocketByID(uint32_t destID); + + // Destructor + ~ServerConnection(); +}; diff --git a/communication/sockets/Isocket.h b/communication/sockets/Isocket.h new file mode 100644 index 00000000..2ecfdd85 --- /dev/null +++ b/communication/sockets/Isocket.h @@ -0,0 +1,21 @@ +#ifndef ISOCKET_H +#define ISOCKET_H + +#include +#include "../../logger/logger.h" + +class ISocket { +public: + virtual int socket(int domain, int type, int protocol) = 0; + virtual int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen) = 0; + virtual int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) = 0; + virtual int listen(int sockfd, int backlog) = 0; + virtual int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) = 0; + virtual int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) = 0; + virtual ssize_t send(int sockfd, const void *buf, size_t len, int flags) = 0; + virtual ssize_t recv(int sockfd, void *buf, size_t len, int flags) = 0; + virtual int close(int fd) = 0; + virtual ~ISocket() = default; +}; + +#endif \ No newline at end of file diff --git a/communication/sockets/mock_socket.h b/communication/sockets/mock_socket.h new file mode 100644 index 00000000..36bced05 --- /dev/null +++ b/communication/sockets/mock_socket.h @@ -0,0 +1,21 @@ + +#ifndef MOCKSOCKET_H +#define MOCKSOCKET_H + +#include "gmock/gmock.h" +#include "Isocket.h" + +class MockSocket : public ISocket { +public: + MOCK_METHOD(int, socket, (int domain, int type, int protocol), (override)); + MOCK_METHOD(int, setsockopt, (int sockfd, int level, int optname, const void *optval, socklen_t optlen), (override)); + MOCK_METHOD(int, bind, (int sockfd, const struct sockaddr *addr, socklen_t addrlen), (override)); + MOCK_METHOD(int, listen, (int sockfd, int backlog), (override)); + MOCK_METHOD(int, accept, (int sockfd, struct sockaddr *addr, socklen_t *addrlen), (override)); + MOCK_METHOD(int, connect, (int sockfd, const struct sockaddr *addr, socklen_t addrlen), (override)); + MOCK_METHOD(ssize_t, send, (int sockfd, const void *buf, size_t len, int flags), (override)); + MOCK_METHOD(ssize_t, recv, (int sockfd, void *buf, size_t len, int flags), (override)); + MOCK_METHOD(int, close, (int fd), (override)); +}; + +#endif \ No newline at end of file diff --git a/communication/sockets/real_socket.cpp b/communication/sockets/real_socket.cpp new file mode 100644 index 00000000..688f07f5 --- /dev/null +++ b/communication/sockets/real_socket.cpp @@ -0,0 +1,113 @@ +#include "real_socket.h" + +logger RealSocket::log = logger("communication"); + +RealSocket::RealSocket(){} + +int RealSocket::socket(int domain, int type, int protocol) +{ + int sockFd = ::socket(domain, type, protocol); + if (sockFd < 0) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "socket creation error: " + std::string(strerror(errno))); + } + RealSocket::log.logMessage(logger::LogLevel::INFO, "create a client socket: " + std::to_string(sockFd) + std::string(" ") + std::string(strerror(errno))); + return sockFd; +} + +int RealSocket::setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen) +{ + int sockopt = ::setsockopt(sockfd, level, optname, optval, optlen); + if (sockopt) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "setsockopt failed: " + std::string(strerror(errno))); + close(sockfd); + } + + RealSocket::log.logMessage(logger::LogLevel::INFO, "create a server socket: " + std::to_string(sockfd)); + return sockopt; +} + +int RealSocket::bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) +{ + int bindAns = ::bind(sockfd, addr, addrlen); + if (bindAns < 0) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Bind failed: " + std::string(strerror(errno))); + close(sockfd); + } + + return bindAns; +} + +int RealSocket::listen(int sockfd, int backlog) +{ + int listenAns = ::listen(sockfd, backlog); + if (listenAns < 0) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Listen failed: " + std::string(strerror(errno))); + close(sockfd); + } + + RealSocket::log.logMessage(logger::LogLevel::INFO, "server running on port " + std::to_string(8080)); + return listenAns; +} + +int RealSocket::accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +{ + int newSocket = ::accept(sockfd, addr, addrlen); + if (newSocket < 0) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Accept failed: " + std::string(strerror(errno))); + } + + RealSocket::log.logMessage(logger::LogLevel::INFO, "connection succeed to client socket number: " + std::to_string(sockfd)); + return newSocket; +} + +int RealSocket::connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) +{ + int connectAns = ::connect(sockfd, addr, addrlen); + if (connectAns < 0) + { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "process", "server", "Connection Failed: " + std::string(strerror(errno))); + } + + RealSocket::log.logMessage(logger::LogLevel::INFO, "process", "server", "connection succeed: " + std::string(strerror(errno))); + return connectAns; +} + +ssize_t RealSocket::recv(int sockfd, void *buf, size_t len, int flags) +{ + int valread = ::recv(sockfd, buf, len, flags); + const Packet *p = static_cast(buf); + + if (valread < 0) + RealSocket::log.logMessage(logger::LogLevel::ERROR, std::to_string(p->getSrcId()), std::to_string(p->getDestId()), std::string(" Error occurred: in socket ") + std::to_string(sockfd) + std::string(" ") + std::string(strerror(errno))); + else if (valread == 0) + RealSocket::log.logMessage(logger::LogLevel::INFO, std::to_string(p->getSrcId()), std::to_string(p->getDestId()), std::string(" connection closed: in socket ") + std::to_string(sockfd) + std::string(" ") + std::string(strerror(errno))); + else + RealSocket::log.logMessage(logger::LogLevel::INFO, std::to_string(p->getSrcId()), std::to_string(p->getDestId()), std::string("received packet type: ") + Message::getMessageTypeString(Message::getMessageTypeFromID(p->getId())) +std::string(" number: ") + std::to_string(p->getPSN()) + std::string(", of messageId: ") + std::to_string(p->getId()) + std::string(" ") + std::string(strerror(errno)) + ( p->getDLC() ? " Data: " + Packet::pointerToHex(p->getPayload(),p->getDLC()):"")); + + return valread; +} + +ssize_t RealSocket::send(int sockfd, const void *buf, size_t len, int flags) +{ + int sendAns = ::send(sockfd, buf, len, flags); + const Packet *p = static_cast(buf); + if (sendAns <= 0) + RealSocket::log.logMessage(logger::LogLevel::ERROR, std::to_string(p->getSrcId()), std::to_string(p->getDestId()), "Error occurred while sending packet type: " + Message::getMessageTypeString(Message::getMessageTypeFromID(p->getId())) + ", number: " + std::to_string(p->getPSN()) + " ,of messageId: " + std::to_string(p->getId()) + ", error: " + std::string(strerror(errno))); + else + RealSocket::log.logMessage(logger::LogLevel::INFO, std::to_string(p->getSrcId()), std::to_string(p->getDestId()), "sending packet type: " + Message::getMessageTypeString(Message::getMessageTypeFromID(p->getId())) + ", number: " + std::to_string(p->getPSN()) + " ,of messageId: " + std::to_string(p->getId()) + (p->getDLC() ? ", Data: " + Packet::pointerToHex(p->getPayload(), p->getDLC()) : "")); + + return sendAns; +} + +int RealSocket::close(int fd) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "close socket number: " + std::to_string(fd)); + RealSocket::log.cleanUp(); + shutdown(fd, SHUT_RDWR); + return ::close(fd); +} \ No newline at end of file diff --git a/communication/sockets/real_socket.h b/communication/sockets/real_socket.h new file mode 100644 index 00000000..9f89ca92 --- /dev/null +++ b/communication/sockets/real_socket.h @@ -0,0 +1,35 @@ +#ifndef REALSOCKET_H +#define REALSOCKET_H + +#include "Isocket.h" +#include +#include +#include "../include/packet.h" +#include "../include/message.h" + +class RealSocket : public ISocket +{ +public: + static logger log; + + RealSocket(); + + int socket(int domain, int type, int protocol) override; + + int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen) override; + + int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) override; + + int listen(int sockfd, int backlog) override; + + int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) override; + + int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) override; + + ssize_t recv(int sockfd, void *buf, size_t len, int flags) override; + + ssize_t send(int sockfd, const void *buf, size_t len, int flags) override; + + int close(int fd) override; +}; +#endif \ No newline at end of file diff --git a/communication/src/bus_manager.cpp b/communication/src/bus_manager.cpp new file mode 100644 index 00000000..55da6240 --- /dev/null +++ b/communication/src/bus_manager.cpp @@ -0,0 +1,164 @@ +#include "../include/bus_manager.h" + +// Initialize static instance to nullptr +BusManager *BusManager::instance = nullptr; +std::mutex BusManager::managerMutex; + +// Private constructor: initializes the server and starts the collision timer +BusManager::BusManager(std::vector idShouldConnect, uint32_t limit) + : server(8080, + std::bind(&BusManager::receiveData, this, std::placeholders::_1)), + lastPacket(nullptr), stopFlag(false) +{ + startCollisionTimer(); // Start collision management timer +} + +// Singleton getter: ensures only one instance of Manager +BusManager *BusManager::getInstance(std::vector idShouldConnect, + uint32_t limit) +{ + if (instance == nullptr) { + std::lock_guard lock(managerMutex); + if (instance == nullptr) { + instance = new BusManager(idShouldConnect, limit); + } + } + return instance; +} + +// Starts the server connection and listens for incoming requests +ErrorCode BusManager::startConnection() +{ + return server.startConnection(); +} + +// Receives a packet, checks for collisions, and sends it if valid +void BusManager::receiveData(Packet &p) +{ + checkCollision(p); // Handle packet collisions +} + +// Sends a packet either as broadcast or to a specific destination +ErrorCode BusManager::sendToClients(const Packet &packet) +{ + if (packet.getIsBroadcast()) { + return server.sendBroadcast(packet); // Broadcast message + } + return server.sendDestination(packet); // Send to specific client +} + +// Manages collisions based on the CAN BUS protocol +void BusManager::checkCollision(Packet ¤tPacket) +{ + std::lock_guard lock(lastPacketMutex); + if (lastPacket == nullptr) { + // No previous packet, store the current one + lastPacket = new Packet(currentPacket); + if (lastPacket == nullptr) { + } + } + else { + if (lastPacket->getTimestamp() == currentPacket.getTimestamp()) { + RealSocket::log.logMessage( + logger::LogLevel::INFO, "Handled collision between packet from SRC " + + std::to_string(lastPacket->getSrcId()) + + " to DST " + std::to_string(lastPacket->getDestId()) + + ", (packet number: " + std::to_string(lastPacket->getPSN()) + + " of messageId " + std::to_string(lastPacket->getId()) + + ") and packet from SRC " + std::to_string(currentPacket.getSrcId()) + + " to DST " + std::to_string(currentPacket.getDestId()) + + ", (packet number: " + std::to_string(currentPacket.getPSN()) + + " of messageId " + std::to_string(currentPacket.getId()) + + ")." + ); + + // Same timestamp indicates potential collision, check priority + Packet* prioritizedPacket = packetPriority(*lastPacket, currentPacket); + + // Log the result once and determine the lost packet + Packet* lostPacket = (prioritizedPacket == ¤tPacket) ? lastPacket : ¤tPacket; + + // Update the last packet if current has priority + if (prioritizedPacket == ¤tPacket) { + delete lastPacket; // Replace last packet + lastPacket = new Packet(currentPacket); + } + + RealSocket::log.logMessage( + logger::LogLevel::INFO, "Packet from SRC " + + std::to_string(prioritizedPacket->getSrcId()) + + " to DST " + std::to_string(prioritizedPacket->getDestId()) + + ", (packet number: " + std::to_string(prioritizedPacket->getPSN()) + + " of messageId " + std::to_string(prioritizedPacket->getId()) + + " ) won the collision. Packet from SRC " + std::to_string(lostPacket->getSrcId()) + + " to DST " + std::to_string(lostPacket->getDestId()) + + ", (packet number: " + std::to_string(lostPacket->getPSN()) + + " of messageId " + std::to_string(lostPacket->getId()) + ") was lost." + ); + } + } +} + +// Determines which packet has higher priority (CAN BUS logic) +Packet *BusManager::packetPriority(Packet &a, Packet &b) +{ + if (a.getIsPassive() && !b.getIsPassive()) { + return &b; // Non-passive packet takes priority + } + else if (!a.getIsPassive() && b.getIsPassive()) { + return &a; // Non-passive packet takes priority + } + else { + return (a > b) ? &a + : &b; // If both are the same type, compare based on ID + } +} + +// Starts the collision timer to periodically check for packet collisions +void BusManager::startCollisionTimer() +{ + stopFlag = false; + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Starting collision timer thread."); // לוג לפני התחלת ה-thread + + collisionTimerThread = std::thread([this]() { + while (!stopFlag) { + GlobalClock::waitForNextTick(); // מחכה לשעון + + if (!stopFlag) + checkLastPacket(); // Check and send last packet if necessary + } + + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Collision timer thread stopping."); // לוג כאשר ה-thread מפסיק + }); +} + + +// Checks the last packet and sends it if it hasn't been sent yet +void BusManager::checkLastPacket() +{ + std::lock_guard lock(lastPacketMutex); + if (lastPacket != nullptr) { + ErrorCode res = sendToClients(*lastPacket); + delete lastPacket; // Clear last packet after sending + lastPacket = nullptr; + } +} + +//shut down the server +void BusManager::stopConnection() +{ + if (instance) { + instance->server.stopServer(); // Stop server on interrupt + } +} + +// Destructor: ensures proper cleanup of threads and resources +BusManager::~BusManager() +{ + stopFlag = true; // Stop collision timer thread + if (collisionTimerThread.joinable()) { + collisionTimerThread.join(); + } + delete lastPacket; // Clean up lastPacket + instance = nullptr; +} \ No newline at end of file diff --git a/communication/src/client_connection.cpp b/communication/src/client_connection.cpp new file mode 100644 index 00000000..6990f7ad --- /dev/null +++ b/communication/src/client_connection.cpp @@ -0,0 +1,127 @@ +#include "../include/client_connection.h" + +// Constructor +ClientConnection::ClientConnection(std::function callback, ISocket* socketInterface): connected(false){ + setCallback(callback); + setSocketInterface(socketInterface); +} + +// Requesting a connection to the server +ErrorCode ClientConnection::connectToServer(int id) +{ + clientSocket = socketInterface->socket(AF_INET, SOCK_STREAM, 0); + if (clientSocket < 0) { + return ErrorCode::SOCKET_FAILED; + } + + servAddress.sin_family = AF_INET; + servAddress.sin_port = htons(PORT); + inet_pton(AF_INET, IP, &servAddress.sin_addr); + + int connectRes = socketInterface->connect(clientSocket, (struct sockaddr *)&servAddress, sizeof(servAddress)); + if (connectRes < 0) { + socketInterface->close(clientSocket); + return ErrorCode::CONNECTION_FAILED; + } + + Packet packet(id); + ssize_t bytesSent = socketInterface->send(clientSocket, &packet, sizeof(Packet), 0); + if (bytesSent < sizeof(Packet)) { + socketInterface->close(clientSocket); + return ErrorCode::SEND_FAILED; + } + + connected = true; + receiveThread = std::thread(&ClientConnection::receivePacket, this); + receiveThread.detach(); + + return ErrorCode::SUCCESS; +} + +// Sends the packet to the manager-sync +ErrorCode ClientConnection::sendPacket(Packet &packet) +{ + //If send executed before start + if (!connected) + return ErrorCode::CONNECTION_FAILED; + packet.setTimestamp(GlobalClock::getCurrentTick()); + ssize_t bytesSent = socketInterface->send(clientSocket, &packet, sizeof(Packet), 0); + if (bytesSent==0) { + closeConnection(); + return ErrorCode::CONNECTION_FAILED; + } + + if (bytesSent<0) + return ErrorCode::SEND_FAILED; + + return ErrorCode::SUCCESS; +} + +// Waits for a message and forwards it to Communication +void ClientConnection::receivePacket() +{ + while (connected) { + Packet packet; + int valread = socketInterface->recv(clientSocket, &packet, sizeof(Packet), 0); + if (valread==0) + break; + + if (valread<0) + continue; + + passPacketCom(packet); + } + + closeConnection(); +} + +// Closes the connection +ErrorCode ClientConnection::closeConnection() +{ + if (connected) { + int socketInterfaceRes = socketInterface->close(clientSocket); + if(socketInterfaceRes < 0) + return ErrorCode::CLOSE_FAILED; + connected = false; + } + return ErrorCode::SUCCESS; +} + +// Setter for passPacketCom +void ClientConnection::setCallback(std::function callback) { + if (!callback) + throw std::invalid_argument("Callback function cannot be null"); + + passPacketCom = callback; +} + +// Setter for socketInterface +void ClientConnection::setSocketInterface(ISocket* socketInterface) { + if (!socketInterface) + throw std::invalid_argument("Socket interface cannot be null"); + + this->socketInterface = socketInterface; +} + +// For testing +int ClientConnection::getClientSocket() +{ + return clientSocket; +} + +int ClientConnection::isConnected() +{ + return connected; +} + +bool ClientConnection::isReceiveThreadRunning() +{ + return false; +} + +//Destructor +ClientConnection::~ClientConnection() +{ + closeConnection(); + delete socketInterface; +} \ No newline at end of file diff --git a/communication/src/communication.cpp b/communication/src/communication.cpp new file mode 100644 index 00000000..f803c498 --- /dev/null +++ b/communication/src/communication.cpp @@ -0,0 +1,633 @@ +#include "../include/communication.h" + +// Initialize static member variables for the Communication class +Communication *Communication::instance = nullptr; +std::thread Communication::busOffRecoveryThread; +std::mutex Communication::messageThreadsMutex; +std::vector> Communication::activeMessageFutures; + +// Constructor +Communication::Communication(uint32_t id, void (*passDataCallback)(uint32_t, void *)) + : TEC(0), REC(0), state(ErrorState::Active), + client(std::bind(&Communication::receivePacket, this, + std::placeholders::_1)), + lastSendTick(GlobalClock::getCurrentTick()) +{ + setId(id); // Set the communication ID + setPassDataCallback(passDataCallback); // Set the callback for data passing + + instance = this; // Initialize the singleton instance + + // Set signal handler for SIGINT + auto signalResult = signal(SIGINT, Communication::signalHandler); + if (signalResult == SIG_ERR) + throw std::runtime_error("Failed to set signal handler for SIGINT"); +} + +// Destructor +Communication::~Communication() +{ + cleanupBusOffRecoveryThread(); // Clean up the bus off recovery thread + waitForActiveMessages(); // Wait for any active message threads to complete + instance->client.closeConnection(); // Close the client connection + instance = nullptr; // Clear the singleton instance +} + +// Sends the client to connect to server +ErrorCode Communication::startConnection() +{ + //Waiting for manager + //syncCommunication.isManagerRunning() + ErrorCode isConnected = client.connectToServer(id); + //Increases the shared memory and blocks the process - if not all are connected + //syncCommunication.registerProcess() + return isConnected; +} + +// Send message synchronously +ErrorCode Communication::sendMessage( + void *data, size_t dataSize, uint32_t destID, uint32_t srcID, + MessageType messageType /*= MessageType::DATA_MESSAGE*/) +{ + if (dataSize == 0) + return ErrorCode::INVALID_DATA_SIZE; // Check for valid data size + if (data == nullptr) + return ErrorCode::INVALID_DATA; // Check for valid data pointer + if (destID == 0 || srcID == 0) + return ErrorCode::INVALID_ID; // Check for valid IDs + if (!client.isConnected()) + return ErrorCode::CONNECTION_FAILED; // Check for connection status + + Message msg(srcID, destID, data, dataSize, + messageType); // Create a new message + + //Sending the message to logger + RealSocket::log.logMessage(logger::LogLevel::INFO, "Start sending " + + Message::getMessageTypeString(messageType) + + " message with ID: " + + std::to_string(msg.getMessageID()) + + " Complete message:" + +Packet::pointerToHex(data ,dataSize)); + + for (auto &packet : msg.getPackets()) { + bool success = + (messageType == MessageType::DATA_MESSAGE) + ? sendPacketWithRTO(packet) // Send with retransmission timeout + : sendPacket(packet); // Send packet directly + if (!success) { + if( messageType == MessageType::DATA_MESSAGE) + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Failed sending " + + Message::getMessageTypeString(messageType) + + " message with ID: " + + std::to_string(msg.getMessageID()) + "."); + else + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Failed sending messageID: " + std::to_string(msg.getMessageID())); + return ErrorCode::SEND_FAILED; // Return failure if sending fails + } + } + RealSocket::log.logMessage(logger::LogLevel::INFO, "Finished sending " + + Message::getMessageTypeString(messageType) + + " message with ID: " + + std::to_string(msg.getMessageID()) + + " successfully."); + + return ErrorCode::SUCCESS; // Return success if all packets are sent +} + +// Send message asynchronously +void Communication::sendMessageAsync( + void *data, size_t dataSize, uint32_t destID, uint32_t srcID, + std::function sendCallback, MessageType messageType) +{ + std::unique_lock lock(messageThreadsMutex); + // Wait until the number of active threads is below the maximum limit + while (activeMessageFutures.size() >= MAX_SIMULTANEOUS_MESSAGES) { + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Maximum simultaneous messages reached. Waiting for available thread..."); + messageThreadsCondition.wait(lock); // Wait for a thread to complete + } + // Print the number of active threads (i.e. messages being sent) + RealSocket::log.logMessage(logger::LogLevel::DEBUG, + "Number of messages being sent simultaneously: " + + std::to_string(activeMessageFutures.size() + 1)); // +1 to include the new thread + + // Create a new promise and future + std::promise promise; + std::future future = promise.get_future(); + activeMessageFutures.push_back(std::move(future)); // Store future + + // Create a new thread to handle message sending + std::thread([this, data, dataSize, destID, srcID, messageType, sendCallback, + promise = std::move(promise)]() mutable { + ErrorCode result = this->sendMessage(data, dataSize, destID, srcID, + messageType); // Send the message + sendCallback(result); // Call the callback after the message is sent + + // Notify that the thread work is done + promise.set_value(); // Set the promise value + + // Lock the mutex to modify the active thread list + std::unique_lock lock(this->messageThreadsMutex); + + // Remove the promise from the vector of active futures + auto it = std::remove_if( + activeMessageFutures.begin(), activeMessageFutures.end(), + [](std::future &future) { + return future.wait_for(std::chrono::seconds(0)) == + std::future_status:: + ready; // Check if the future is ready + }); + activeMessageFutures.erase( + it, activeMessageFutures.end()); // Remove invalid futures + // Notify one waiting thread that space is available + this->messageThreadsCondition.notify_one(); + }).detach(); // Detach the thread +} + +void Communication::receivePacket(Packet &packet) +{ + // Check if the bus is in BusOff state + // If the system is in BusOff state, exit the function and do not process the packet + if (getState() == ErrorState::BusOff) { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Cannot receive packet " + + std::to_string(packet.getPSN()) + + ",of message ID: " + + std::to_string(packet.getId()) + + ", system in BusOff state."); + return; + } + + // Check if the destination ID of the packet matches the expected destination + if (checkDestId(packet)) { + // Check CRC validity + // Validate the packet's CRC for data integrity + if (!validCRC(packet)) { + // Log the CRC error with packet ID and other information + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Invalid CRC detected for packet " + + std::to_string(packet.getPSN()) + + " of message ID: " + + std::to_string(packet.getId()) + "."); + // Retrieve the packet ID + uint32_t packetId = packet.getId(); + // Send an error message back to the source with the invalid packet ID + sendMessage(&packetId, sizeof(packetId), packet.getSrcId(), + packet.getDestId(), MessageType::ERROR_MESSAGE); + + RealSocket::log.logMessage(logger::LogLevel::INFO, "Sending ERROR message for packet " + + std::to_string(packet.getPSN()) + + "of message ID " + + std::to_string(packetId) + + " back to source ID " + + std::to_string(packet.getSrcId()) + + " (Broadcast)."); + + return; // Exit after sending the error message + } + + // Handle message based on type + // Determine the type of the message based on the packet ID + switch (Message::getMessageTypeFromID(packet.getId())) { + case DATA_MESSAGE: + // If the message is a data message, handle it accordingly + handleDataPacket(packet); + break; + case ERROR_MESSAGE: + // If the message is an error message, handle it accordingly + handleErrorPacket(packet); + break; + case ACK: + // If the message is an acknowledgment message, handle it accordingly + handleAckPacket(packet); + break; + default: + // If the message type is unrecognized, do nothing + break; + } + } +} + +bool Communication::checkDestId(Packet &p) +{ + // If the packet is a broadcast, it can be received by any node + //Or if the destination ID matches, the packet is intended for this node + return p.getIsBroadcast() || p.getDestId() == id; +} + +bool Communication::validCRC(Packet &packet) +{ + // Validate the CRC of the packet + // This checks the integrity of the packet to ensure it hasn't been corrupted + return packet.validateCRC(); +} + +void Communication::handleDataPacket(Packet &packet) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "Handling data packet " + + std::to_string(packet.getPSN()) + + " of message ID " + + std::to_string(packet.getId()) + "."); + + // Add the incoming data packet to the message queue + addPacketToMessage(packet); + + // Retrieve the packet ID for acknowledgment + uint32_t packetId = packet.getId(); + + // Send an acknowledgment message back to the source of the packet + RealSocket::log.logMessage(logger::LogLevel::INFO, "Sending ACK for packet " + + std::to_string(packet.getPSN()) + + " of message ID " + + std::to_string(packetId) + + " back to source ID " + + std::to_string(packet.getSrcId()) + "."); + sendMessage(&packetId, sizeof(packetId), packet.getSrcId(), packet.getDestId(), MessageType::ACK); +} + +void Communication::handleErrorPacket(Packet &packet) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "Handling error packet " + + std::to_string(packet.getPSN()) + + " of message ID " + + std::to_string(packet.getId()) + "."); + + // Check if the error packet is intended for this node + if (packet.getDestId() == id) { + // Handle the transmission error for this node + handleTransmitError(); + } + else { + // Handle the reception error for other nodes + handleReceiveError(); + } +} + +void Communication::handleAckPacket(Packet &packet) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "Handling ACK packet " + + std::to_string(packet.getPSN()) + + " of message ID " + + std::to_string(packet.getId()) + "."); + + // Retrieve the message ID from the packet's payload + const uint32_t *receivedIdPtr = reinterpret_cast(packet.getPayload()); + uint32_t receivedId = *receivedIdPtr; + + // Notify the scheduler of the received acknowledgment + scheduler.receiveACK(receivedId); + + // Handle the successful transmission + handleSuccessfulTransmission(); +} + +void Communication::addPacketToMessage(Packet &packet) +{ + // Convert the packet ID to a string for lookup + std::string messageId = std::to_string(packet.getId()); + + // Check if the message with the given ID already exists + if (receivedMessages.find(messageId) != receivedMessages.end()) { + // If it exists, add the packet to the existing message + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Adding packet " + + std::to_string(packet.getPSN()) + + " to existing message ID " + + messageId + "."); + receivedMessages[messageId].addPacket(packet); + } + else { + // If it does not exist, create a new message and add the packet + RealSocket::log.logMessage(logger::LogLevel::INFO, "Creating a new message for receiving message ID " + + std::to_string(packet.getId()) + + " and adding packet " + + std::to_string(packet.getPSN()) + + " to it."); + Message msg(packet.getTPS()); + msg.addPacket(packet); + receivedMessages[messageId] = msg; // Store the new message + } + + // Check if the message is complete + if (receivedMessages[messageId].isComplete()) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "Message ID " + + messageId + + " is complete. Processing complete data."); + // Retrieve complete data from the message + void *completeData = receivedMessages[messageId].completeData(); + // Pass the complete data for further processing + passData(packet.getSrcId(), completeData); + // Remove the message from the received messages + receivedMessages.erase(messageId); + } +} + +void Communication::recoverFromBusOff() +{ + // Check if the communication system is in the BusOff state + // and if the bus off recovery thread is not already running + if (getState() == ErrorState::BusOff && + (!busOffRecoveryThread.joinable())) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "Recovering from BusOff state."); + // Start a new thread for the recovery process + busOffRecoveryThread = std::thread(&Communication::busOffTimer, this); + } +} + +void Communication::busOffTimer() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "BusOff recovery started, sleeping for " + std::to_string(BUSOFF_RECOVERY_TIME.count()) + " milliseconds."); + + // Put the thread to sleep for the defined BusOff recovery time + std::this_thread::sleep_for(BUSOFF_RECOVERY_TIME); + + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "BusOff recovery time elapsed. Resetting communication state."); + // Reset the state of the communication system after the sleep period + resetState(); +} + +void Communication::cleanupBusOffRecoveryThread() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Cleaning up BusOff recovery thread."); + // Check if the BusOff recovery thread is joinable + if (busOffRecoveryThread.joinable()) { + // Join the thread to clean up and ensure proper resource management + busOffRecoveryThread.join(); + } +} + +void Communication::checkState() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Checking communication state. TEC: " + std::to_string(TEC) + ", REC: " + std::to_string(REC)); + + // Lock the mutex to ensure thread safety while checking and updating the state + { + std::lock_guard lock(mtx); + + // Check if TEC exceeds the BusOff threshold + if (TEC >= THRESHOLD_TO_BUSOFF) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "State changed to BusOff. TEC exceeded threshold: " + std::to_string(TEC)); + state = ErrorState::BusOff; // Set state to BusOff + } + // Check if TEC or REC exceeds the Passive threshold + else if (TEC >= THRESHOLD_TO_PASSIVE || REC >= THRESHOLD_TO_PASSIVE) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "State changed to Passive. TEC or REC exceeded threshold."); + state = ErrorState::Passive; // Set state to Passive + } + else { + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "State remains Active."); + state = ErrorState::Active; // Set state to Active + } + } + + // If the state is BusOff, initiate recovery process + if (getState() == ErrorState::BusOff) { + recoverFromBusOff(); + } +} + +void Communication::handleTransmitError() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Handling transmission error. Incrementing TEC by 8."); + // Lock the mutex to ensure thread safety while updating the TEC + { + std::lock_guard lock(mtx); + TEC += 8; // Increment the Transmission Error Counter by 8 + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "TEC after increment: " + std::to_string(TEC)); + // Check the state based on the updated TEC + checkState(); +} + +void Communication::handleACKError() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Handling ACK error. Incrementing TEC by 1."); + // Lock the mutex to ensure thread safety while updating the TEC + { + std::lock_guard lock(mtx); + TEC += 1; // Increment the Transmission Error Counter by 1 + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "TEC after increment: " + std::to_string(TEC)); + // Check the state based on the updated TEC + checkState(); +} + +void Communication::handleReceiveError() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Handling reception error. Incrementing REC by 1."); + // Lock the mutex to ensure thread safety while updating the REC + { + std::lock_guard lock(mtx); + REC += 1; // Increment the Reception Error Counter by 1 + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "REC after increment: " + std::to_string(REC)); + // Check the state based on the updated REC + checkState(); +} + +void Communication::handleSuccessfulTransmission() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Handling successful transmission. Decrementing TEC by 1 if greater than zero."); + // Lock the mutex to ensure thread safety while updating the TEC + { + std::lock_guard lock(mtx); + if (TEC > 0) + TEC -= 1; // Decrement the Transmit Error Counter by 1 if greater than zero + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "TEC after decrement: " + std::to_string(TEC)); + // Check the state based on the updated TEC + checkState(); +} + +void Communication::resetState() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Resetting communication state. Resetting TEC, REC, and setting state to Active."); + // Lock the mutex to ensure thread safety while resetting the state + std::lock_guard lock(mtx); + TEC = 0; // Reset the Transmit Error Counter to zero + REC = 0; // Reset the Receive Error Counter to zero + state = ErrorState::Active; // Set the communication state to Active + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "State reset complete. TEC: 0, REC: 0, State: Active."); +} + +ErrorState Communication::getState() +{ + // Lock the mutex to ensure thread safety while accessing the state + std::lock_guard lock(mtx); + // Return the current state of the communication system + return state; // Return the state regardless of its value +} + +void Communication::ensureSingleSendPerTick() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Ensuring single send per tick."); + + // Lock the mutex to ensure thread safety while checking the tick + std::lock_guard lock(sendMutex); + + // Retrieve the current clock tick from the GlobalClock + int currentTick = GlobalClock::getCurrentTick(); + + // Check if the current tick is different from the last sent tick + if (currentTick != lastSendTick.load()) { + // Update the last sent tick to the current tick + lastSendTick.store(currentTick); + + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Message can be sent this tick."); + return; // Exit the function if a message can be sent + } + + // Wait for the next clock tick if the last tick is the same + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Waiting for next clock tick."); + GlobalClock::waitForNextTick(); + + // Update lastSendTick after waiting + lastSendTick.store(GlobalClock::getCurrentTick()); +} + +bool Communication::sendPacketWithRTO(Packet &packet, int retransmissions /* = 0 */) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "Sending packet " + + std::to_string(packet.getPSN()) + + ", with RTO of message ID: " + + std::to_string(packet.getId()) + + ", Retransmissions: " + + std::to_string(retransmissions) + "."); + + // If the system is in BusOff state or retransmissions exceed the maximum limit, stop sending + if (getState() == ErrorState::BusOff || retransmissions > MAX_RETRANSMISSIONS) { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Cannot send packet " + + std::to_string(packet.getPSN()) + + ",of message ID: " + + std::to_string(packet.getId()) + + ", State: BusOff or retransmissions exceeded limit."); + scheduler.clearPacketData(packet.getId()); + return false; + } + + ensureSingleSendPerTick(); + + // Set the packet to passive mode if the system state is passive + packet.setIsPassive(getState() == ErrorState::Passive); + + // Attempt to send the packet using the client + ErrorCode res = client.sendPacket(packet); + + // If the packet is successfully sent, wait for the ACK + if (res == ErrorCode::SUCCESS) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "Packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + std::to_string(packet.getId()) + + " sent. Waiting for ACK..."); + + // Create a promise to track the success of the transmission (ACK reception) + auto ackPromisePtr = std::make_shared>(); + auto ackFuture = ackPromisePtr->get_future(); // Future to wait for the ACK result + + // Start a retransmission timer for this packet + scheduler.startRetransmissionTimer( + packet.getId(), + [this, packet, ackPromisePtr](int retryCount) mutable { + // Handle errors if ACK was not received + this->handleACKError(); + + // Retry sending the packet and update the promise with the result + bool result = this->sendPacketWithRTO(packet, retryCount); + ackPromisePtr->set_value(result); // Set the result of retransmission + }, + ackPromisePtr); + + // Wait for the result of the future (ACK reception or timeout) + bool success = ackFuture.get(); + + RealSocket::log.logMessage(logger::LogLevel::INFO, + std::string(success ? "ACK received successfully" : "ACK reception failed") + + " for packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + + std::to_string(packet.getId()) + "."); + + return success; // Return true if successful, false otherwise + } + + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + std::to_string(packet.getId()) + + " transmission failed."); + // Return false if the packet transmission failed + return false; +} + +bool Communication::sendPacket(Packet &packet) +{ + RealSocket::log.logMessage(logger::LogLevel::INFO, "Sending packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + + std::to_string(packet.getId()) + "."); + + // Check if the system is in BusOff state + if (getState() == ErrorState::BusOff) { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Cannot send packet " + + std::to_string(packet.getPSN()) + + ",of message ID: " + + std::to_string(packet.getId()) + + ", system in BusOff state."); + return false; // Return false immediately + } + + ensureSingleSendPerTick(); + + // Set the packet to passive mode if the system state is passive + packet.setIsPassive(getState() == ErrorState::Passive); + + // Attempt to send the packet using the client + ErrorCode res = client.sendPacket(packet); + + res == ErrorCode::SUCCESS ? + RealSocket::log.logMessage(logger::LogLevel::INFO, "Packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + + std::to_string(packet.getId()) + + " sent successfully."): + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Packet " + + std::to_string(packet.getPSN()) + + ", of message ID: " + + std::to_string(packet.getId()) + + " Failed to send."); + return res == ErrorCode::SUCCESS; // Return true if successful, false otherwise +} + +void Communication::waitForActiveMessages() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Waiting for active messages to complete..."); + + std::unique_lock lock(messageThreadsMutex); // Lock for thread safety + + // Wait for all active message threads to complete + for (auto &future : activeMessageFutures) { + if (future.valid()) { + future.wait(); // Wait for the future to finish if valid + } + } + + activeMessageFutures.clear(); // Clear the vector after all futures are completed + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "All active messages completed."); +} + +void Communication::signalHandler(int signum) +{ + if (instance) { + cleanupBusOffRecoveryThread(); // Clean up the recovery thread if it is running + waitForActiveMessages(); // Wait for any active message threads to complete + instance->client.closeConnection(); // Close the client connection + } + exit(signum); // Exit the program with the given signal number +} + +void Communication::setId(uint32_t newId) +{ + id = newId; // Set the new ID for the communication instance +} + +void Communication::setPassDataCallback(void (*callback)(uint32_t, void *)) +{ + if (callback == nullptr) + throw std::invalid_argument("Invalid callback function: passDataCallback cannot be null"); + passData = callback; // Set the callback function for passing data +} \ No newline at end of file diff --git a/communication/src/error_code.h b/communication/src/error_code.h new file mode 100644 index 00000000..961767c0 --- /dev/null +++ b/communication/src/error_code.h @@ -0,0 +1,46 @@ +#ifndef ERRORCODE_H +#define ERRORCODE_H + +enum class ErrorCode { + SUCCESS = 0, + SOCKET_FAILED = -1, + CONNECTION_FAILED = -2, + BIND_FAILED = -3, + LISTEN_FAILED = -4, + ACCEPT_FAILED = -5, + SEND_FAILED = -6, + RECEIVE_FAILED = -7, + CLOSE_FAILED = -8, + DISCONNECT_FAILED = -9, + INVALID_CLIENT_ID = -10, + CALLBACK_ERROR = -11, + SOCKET_INTERFACE_ERROR = -12, + INVALID_DATA_SIZE = -13, + INVALID_DATA = -14, + INVALID_ID = -15 +}; + +// Function to convert ErrorCode to string +inline const char* toString(ErrorCode error) { + switch (error) { + case ErrorCode::SUCCESS: return "SUCCESS"; + case ErrorCode::SOCKET_FAILED: return "SOCKET_FAILED"; + case ErrorCode::CONNECTION_FAILED: return "CONNECTION_FAILED"; + case ErrorCode::SEND_FAILED: return "SEND_FAILED"; + case ErrorCode::RECEIVE_FAILED: return "RECEIVE_FAILED"; + case ErrorCode::CLOSE_FAILED: return "CLOSE_FAILED"; + case ErrorCode::CALLBACK_ERROR: return "CALLBACK_ERROR"; + case ErrorCode::SOCKET_INTERFACE_ERROR: return "SOCKET_INTERFACE_ERROR"; + case ErrorCode::BIND_FAILED: return "BIND_FAILED"; + case ErrorCode::LISTEN_FAILED: return "LISTEN_FAILED"; + case ErrorCode::ACCEPT_FAILED: return "ACCEPT_FAILED"; + case ErrorCode::INVALID_CLIENT_ID: return "INVALID_CLIENT_ID"; + case ErrorCode::DISCONNECT_FAILED: return "DISCONNECT_FAILED"; + case ErrorCode::INVALID_DATA_SIZE: return "INVALID_DATA_SIZE"; + case ErrorCode::INVALID_DATA: return "INVALID_DATA"; + case ErrorCode::INVALID_ID: return "INVALID_ID"; + default: return "UNKNOWN_ERROR"; + } +} + +#endif // ERRORCODE_H diff --git a/communication/src/global_clock.cpp b/communication/src/global_clock.cpp new file mode 100644 index 00000000..e4dee8ba --- /dev/null +++ b/communication/src/global_clock.cpp @@ -0,0 +1,137 @@ +#include "../include/global_clock.h" + +SharedClock *GlobalClock::shared_clock = nullptr; +int GlobalClock::shared_memory_fd = -1; +std::thread GlobalClock::clock_thread; + +void GlobalClock::startClock() +{ + initializeSharedMemory(); // Initialize shared memory if not done yet + if (shared_clock->is_running.load()) { // Check if the clock is already running + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Clock is already running."); + return; // Exit if clock is running + } + initializeClockSynchronization(); // Initialize the mutex and condition variables for shared clock synchronization. + shared_clock->is_running.store(true); // Set the clock state to running + RealSocket::log.logMessage(logger::LogLevel::INFO, "Clock started."); + clock_thread = std::thread(runClock); // Run the clock in a new thread +} + +int GlobalClock::getCurrentTick() +{ + initializeSharedMemory(); // Ensure shared memory is initialized before accessing the tick + int currentTick = shared_clock->current_tick.load(); + return currentTick; // Return the current tick value +} + +void GlobalClock::waitForNextTick() +{ + initializeSharedMemory(); // Ensure shared memory is initialized + pthread_mutex_lock(&shared_clock->tick_mutex); // Lock the mutex to access the tick safely + pthread_cond_wait(&shared_clock->tick_cond, &shared_clock->tick_mutex); // Wait for tick condition signal + pthread_mutex_unlock(&shared_clock->tick_mutex); // Unlock the mutex after tick change +} + +void GlobalClock::stopClock() +{ + if (shared_clock->is_running.load()) { + RealSocket::log.logMessage(logger::LogLevel::INFO, "Stopping the clock..."); + shared_clock->is_running.store(false); // Stop the clock + if (clock_thread.joinable()) { + clock_thread.join(); // Wait for the clock thread to finish + } + RealSocket::log.logMessage(logger::LogLevel::INFO, "Clock stopped."); + } +} + +void GlobalClock::initializeSharedMemory() +{ + if (shared_clock == nullptr) { // Check if shared memory is already initialized + shared_memory_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666); // Open shared memory file descriptor + if (shared_memory_fd == -1) { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Error accessing shared memory for the clock."); + exit(1); // Exit if there's an error accessing shared memory + } + ftruncate(shared_memory_fd, sizeof(SharedClock)); // Set size of shared memory + shared_clock = static_cast(mmap(0, sizeof(SharedClock), PROT_READ | PROT_WRITE, MAP_SHARED, shared_memory_fd, 0)); // Map shared memory + if (shared_clock == MAP_FAILED) { + RealSocket::log.logMessage(logger::LogLevel::ERROR, "Error mapping shared memory for the clock."); + exit(1); // Exit if there's an error mapping shared memory + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Shared memory for the clock initialized."); + registerCleanup(); // Register cleanup function to be called at exit + } +} + +void GlobalClock::initializeClockSynchronization() +{ + // Initialize mutex and condition variables for shared clock synchronization + pthread_mutexattr_t mutex_attr; + pthread_condattr_t cond_attr; + + // Initialize the mutex attribute and set it to be shared between processes + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); // Set mutex to be shared between processes + pthread_mutex_init(&shared_clock->tick_mutex, &mutex_attr); // Initialize mutex in shared memory + + // Initialize the condition attribute and set it to be shared between processes + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); // Set condition variable to be shared between processes + pthread_cond_init(&shared_clock->tick_cond, &cond_attr); // Initialize condition variable in shared memory + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Clock synchronization initialized."); +} + +void GlobalClock::releaseProcessResources() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Releasing process-specific resources for the clock..."); + if (shared_clock != nullptr) { + munmap(shared_clock, sizeof(SharedClock)); // Unmap the shared memory from the process address space + close(shared_memory_fd); // Close the file descriptor for shared memory + shared_clock = nullptr; // Reset pointer to shared clock + shared_memory_fd = -1; // Reset shared memory file descriptor + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Process-specific resources for the clock released."); + } +} + +void GlobalClock::releaseSharedSystemResources() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Releasing system-wide shared resources for the clock..."); + if (shared_memory_fd != -1) { + shm_unlink(SHM_NAME); // Unlink shared memory from the system (delete it) + close(shared_memory_fd); // Close the file descriptor for shared memory + shared_memory_fd = -1; // Reset shared memory file descriptor + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "System-wide shared resources for the clock released."); + } +} + +void GlobalClock::registerCleanup() +{ + atexit([]() { // Register a cleanup function to be called when the process exits + if (shared_clock->is_running.load()) { + GlobalClock::releaseProcessResources(); // Release process-specific resources if the clock is running + } + else { + GlobalClock::releaseSharedSystemResources(); // Release system-wide resources if the clock is not running + } + }); +} + +void GlobalClock::runClock() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Clock thread started."); + while (shared_clock->is_running.load()) { + std::this_thread::sleep_for(TICK_DURATION); // Sleep for tick duration + pthread_mutex_lock(&shared_clock->tick_mutex); // Lock the mutex to safely increment the tick + // Increment the tick and check for overflow + if (shared_clock->current_tick.load() >= MAX_TICK) { + shared_clock->current_tick.store(0); // Reset to 0 if max is reached + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Tick overflow, resetting to 0."); + } + else { + shared_clock->current_tick++; // Increment the tick + } + pthread_cond_broadcast(&shared_clock->tick_cond); // Notify all waiting processes that the tick has changed + pthread_mutex_unlock(&shared_clock->tick_mutex); // Unlock the mutex after tick change + } + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Clock thread stopped."); +} diff --git a/communication/src/message.cpp b/communication/src/message.cpp new file mode 100644 index 00000000..5f79cdb4 --- /dev/null +++ b/communication/src/message.cpp @@ -0,0 +1,92 @@ +#include "../include/message.h" + +// Constructor for sending message +Message::Message(uint32_t srcID, uint32_t destID, void* data, size_t size, MessageType messageType) +{ + messageID = generateMessageID(messageType, srcID); + tps = (size + SIZE_PACKET-1) / SIZE_PACKET; // Calculate the number of packets needed + bool isBroadcast = (messageType == MessageType::ERROR_MESSAGE); + bool isRTR = (messageType == MessageType::REMOTE_TRANSMISSION_REQUEST); + for (uint32_t i = 0; i < tps; ++i) { + uint8_t packetData[SIZE_PACKET]; + size_t copySize = std::min(size - i * SIZE_PACKET, (size_t)SIZE_PACKET); // Determine how much data to copy for each packet + std::memcpy(packetData, (uint8_t*)data + i * SIZE_PACKET, copySize); + packets.emplace_back(messageID, i, tps, srcID, destID, copySize, isRTR, isBroadcast, packetData); + } +} + +uint32_t Message::generateMessageID(MessageType messageType, uint16_t srcID) { + uint32_t messageTypeID = static_cast(messageType); + uint32_t srcID32 = static_cast(srcID); + + auto now = std::chrono::system_clock::now().time_since_epoch(); + uint32_t timestamp = static_cast( + std::chrono::duration_cast(now).count() & 0xFF + ); + + return (messageTypeID << 24) | + (srcID32 << 8) | + timestamp; +} + +// Constructor for receiving message +Message::Message(uint32_t tps) +{ + this->tps = tps; +} + +// Add a packet to the received message +bool Message::addPacket(const Packet &p) +{ + if (!packets.empty()) { + if (p.getPSN() <= packets.back().getPSN()) { + return false; + } + } + packets.push_back(p); + return true; +} + +// Check if the message is complete +bool Message::isComplete() const +{ + return packets.size() == tps; +} + +// Get the complete data of the message +void *Message::completeData() const +{ + size_t totalSize = (tps - 1) * SIZE_PACKET + packets.back().getDLC(); + void *data = malloc(totalSize); + for (const auto &packet : packets) { + std::memcpy((uint8_t *)data + packet.getPSN() * SIZE_PACKET, packet.getPayload(), packet.getDLC()); + } + return data; +} + +// Get the packets of the message +std::vector &Message::getPackets() +{ + return packets; +} + +// Extract the MessageType from the message ID +MessageType Message::getMessageTypeFromID(uint32_t messageID) +{ + uint32_t messageTypeID = (messageID >> 24) & 0xFF; // Extract the message type + return static_cast(messageTypeID); // Cast to MessageType +} + +// Returns a string representation of the MessageType enum value +std::string Message::getMessageTypeString(MessageType type) +{ + switch (type) { + case MessageType::INITIAL_CONNECTION: return "INITIAL_CONNECTION"; + case MessageType::ERROR_MESSAGE: return "ERRORE"; + case MessageType::ACK: return "ACK"; + case MessageType::OVERLOAD_MESSAGE: return "OVERLOAD"; + case MessageType::REMOTE_TRANSMISSION_REQUEST: return "RTRT"; + case MessageType::DATA_MESSAGE: return "DATA"; + default: return "UNKNOWN"; + } +} \ No newline at end of file diff --git a/communication/src/packet.cpp b/communication/src/packet.cpp new file mode 100644 index 00000000..625c8d86 --- /dev/null +++ b/communication/src/packet.cpp @@ -0,0 +1,118 @@ +#include "../include/packet.h" + +// Constructor to initialize Packet for sending +Packet::Packet(uint32_t id, uint32_t PSN, uint32_t TPS, uint32_t srcId, + uint32_t destId, int DLC, bool RTR, bool isBroadcast, + const uint8_t* payload) +{ + header.id = id; + header.PSN = PSN; + header.TPS = TPS; + header.srcId = srcId; + header.destId = destId; + header.DLC = DLC; + header.RTR = RTR; + header.passive = false; // Default value + header.timestamp = 0; // Default value + header.isBroadcast = isBroadcast; + + // Ensure DLC does not exceed payload size + size_t copySize = std::min(static_cast(DLC), sizeof(this->payload)); + + // Copy payload into the payload array if DLC > 0 + if (DLC > 0 && payload) + { + std::memcpy(this->payload, payload, copySize); + } + else + { + std::memset(this->payload, 0, sizeof(this->payload)); // Zero out payload if no data + } + + // Calculate and set CRC after setting other header fields + header.CRC = calculateCRC(); +} + +// Constructor to initialize receiving Packet ID for init +Packet::Packet(uint32_t id) +{ + std::memset(&header, 0, sizeof(header)); // Initialize all fields to zero + header.srcId = id; +} + +// Overloaded operator> +bool Packet::operator>(const Packet &other) const +{ + return header.id < other.header.id; +} + +// CRC calculation +uint16_t Packet::calculateCRC() const +{ + const uint16_t polynomial = 0x4599; // Polynomial for CRC-15 CAN + uint16_t crc = 0x0000; // Initial CRC value for CAN Bus + + // Combine header fields for CRC calculation (without the CRC field itself) + crc ^= header.id; + crc = (crc << 1) ^ polynomial; + + crc ^= header.PSN; + crc = (crc << 1) ^ polynomial; + + crc ^= header.TPS; + crc = (crc << 1) ^ polynomial; + + crc ^= header.srcId; + crc = (crc << 1) ^ polynomial; + + crc ^= header.destId; + crc = (crc << 1) ^ polynomial; + + crc ^= header.DLC; + crc = (crc << 1) ^ polynomial; + + crc ^= header.RTR; + crc = (crc << 1) ^ polynomial; + + crc ^= header.isBroadcast; + crc = (crc << 1) ^ polynomial; + + // Process payload bytes for CRC calculation + for (int i = 0; i < header.DLC; ++i) + { + crc ^= payload[i] << 8; + for (int j = 0; j < 8; ++j) + { + if (crc & 0x8000) + { + crc = (crc << 1) ^ polynomial; + } + else + { + crc <<= 1; + } + } + } + + // Mask CRC to 15 bits as per the CAN protocol + return crc & 0x7FFF; +} + +// Validate CRC +bool Packet::validateCRC() const +{ + return calculateCRC() == header.CRC; +} + +// Static function to convert raw data to hexadecimal +std::string Packet::pointerToHex(const void* data, size_t size) +{ + std::ostringstream oss; // Stream for constructing the hexadecimal output + const uint8_t* byteData = static_cast(data); // Cast the void* to uint8_t* for byte-wise access + + for (size_t i = 0; i < size; ++i) { + oss << std::hex << std::setw(2) << std::setfill('0') << static_cast(byteData[i]); + } + + return oss.str(); // Return the hex string representation of the data +} \ No newline at end of file diff --git a/communication/src/scheduler.cpp b/communication/src/scheduler.cpp new file mode 100644 index 00000000..a69ff0a4 --- /dev/null +++ b/communication/src/scheduler.cpp @@ -0,0 +1,96 @@ +#include "../include/scheduler.h" + +Scheduler::Scheduler() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Scheduler created."); +} + +Scheduler::~Scheduler() +{ + stopAllTimers(); + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Scheduler destroyed and all timers stopped."); +} + +void Scheduler::stopAllTimers() +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Stopping all timers."); + for (auto &future : futures) + { + if (future.valid()) + future.wait(); // Wait for all threads to finish + } +} + +void Scheduler::startRetransmissionTimer(int packetID, Callback callback, std::shared_ptr> ackPromise) +{ + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Starting retransmission timer for packet of message ID: " + + std::to_string(packetID)); + + // Promise to manage the lifecycle of the thread itself + std::promise threadCompletionPromise; + + // Future object to track the thread's completion + std::future future = threadCompletionPromise.get_future(); + + // Store the future to ensure we can wait for the thread to finish later + futures.push_back(std::move(future)); + + // Start a new thread for handling retransmission and ACK wait + std::thread([this, packetID, callback, threadCompletionPromise = std::move(threadCompletionPromise), ackPromise]() mutable + { + int retryCount = 0; + { + std::unique_lock lock(mutex); + + // Wait for an ACK or timeout + if (cv.wait_for(lock, MAX_ACK_TIMEOUT, [this, packetID]() + { return ackReceived[packetID]; })) + { + // ACK received within the timeout period + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "ACK received for packet of message ID: " + + std::to_string(packetID)); + + clearPacketData(packetID); // Clear packet data + + // Set both promises to indicate success and thread completion + threadCompletionPromise.set_value(); + ackPromise->set_value(true); // ACK was received, set to true + return; // Exit the thread + } + else + { + // Timeout occurred, retransmit the packet + retryCounts[packetID]++; + retryCount = retryCounts[packetID]; + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Retransmitting packet of message ID: " + + std::to_string(packetID) + + ", retry count: " + std::to_string(retryCount)); + } + } + + // Call the callback function with the updated retry count + callback(retryCount); + + // Set the promise to indicate the thread has finished + threadCompletionPromise.set_value(); + }) + .detach(); // Detach the thread to allow it to run independently +} + +void Scheduler::receiveACK(int packetID) +{ + std::unique_lock lock(mutex); + ackReceived[packetID] = true; + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "ACK received for packet of message ID: " + + std::to_string(packetID) + + ". Notifying all waiting threads."); + cv.notify_all(); // Notify all waiting threads +} + +void Scheduler::clearPacketData(int packetID) +{ + ackReceived.erase(packetID); + retryCounts.erase(packetID); + RealSocket::log.logMessage(logger::LogLevel::DEBUG, "Cleared data for packet of message ID: " + + std::to_string(packetID)); +} diff --git a/communication/src/server_connection.cpp b/communication/src/server_connection.cpp new file mode 100644 index 00000000..1dc57bcd --- /dev/null +++ b/communication/src/server_connection.cpp @@ -0,0 +1,267 @@ +#include +#include +#include "../include/server_connection.h" + +// Constructor +ServerConnection::ServerConnection(int port, std::function callback, ISocket* socketInterface) { + setPort(port); + setReceiveDataCallback(callback); + setSocketInterface(socketInterface); + running = false; +} + +// Initializes the listening socket +ErrorCode ServerConnection::startConnection() +{ + // Create socket TCP + serverSocket = socketInterface->socket(AF_INET, SOCK_STREAM, 0); + if (serverSocket < 0) + return ErrorCode::SOCKET_FAILED; + + // Setting the socket to allow reuse of address and port + int opt = 1; + int setSockOptRes = socketInterface->setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); + if (setSockOptRes) { + socketInterface->close(serverSocket); + return ErrorCode::SOCKET_FAILED; + } + + address.sin_family = AF_INET; + address.sin_addr.s_addr = INADDR_ANY; + address.sin_port = htons(port); + + int bindRes = socketInterface->bind(serverSocket, (struct sockaddr *)&address, sizeof(address)); + if (bindRes < 0) { + socketInterface->close(serverSocket); + return ErrorCode::BIND_FAILED; + } + + int lisRes = socketInterface->listen(serverSocket, 5); + if (lisRes < 0) { + socketInterface->close(serverSocket); + return ErrorCode::LISTEN_FAILED; + } + + running = true; + mainThread = std::thread(&ServerConnection::startThread, this); + mainThread.detach(); + + return ErrorCode::SUCCESS; +} + +// Starts listening for connection requests +void ServerConnection::startThread() +{ + while (running) { + int clientSocket = socketInterface->accept(serverSocket, nullptr, nullptr); + if (!clientSocket) + continue; + + if(clientSocket<0){ + stopServer(); + return; + } + // Opens a new thread for handleClient - listening to messages from the process + { + std::lock_guard lock(threadMutex); + clientThreads.emplace_back(&ServerConnection::handleClient, this, clientSocket); + } + } +} + +// Closes the sockets and the threads +void ServerConnection::stopServer() +{ + if(!running) + return; + + running = false; + socketInterface->close(serverSocket); + { + std::lock_guard lock(socketMutex); + for (int sock : sockets) + socketInterface->close(sock); + sockets.clear(); + } + { + std::lock_guard lock(threadMutex); + for (auto &th : clientThreads) + if (th.joinable()) + th.join(); + } +} + +// Runs in a thread for each process - waits for a message and forwards it to the manager +void ServerConnection::handleClient(int clientSocket) +{ + Packet packet; + int valread = socketInterface->recv(clientSocket, &packet, sizeof(Packet), 0); + + //implement according to CAN bus + if (valread <= 0) + return; + + uint32_t clientID = packet.getSrcId(); + if(!isValidId(clientID)) + return; + + { + std::lock_guard lock(IDMapMutex); + clientIDMap[clientSocket] = clientID; + } + + { + std::lock_guard lock(socketMutex); + sockets.push_back(clientSocket); + } + + while (running) { + int valread = socketInterface->recv(clientSocket, &packet, sizeof(Packet), 0); + if (valread == 0) + break; + + if(valread < 0) + continue; + + receiveDataCallback(packet); + } + + { + // If the process is no longer connected + std::lock_guard lock(socketMutex); + auto it = std::find(sockets.begin(), sockets.end(), clientSocket); + socketInterface->close(*it); + if (it != sockets.end()) + sockets.erase(it); + } + { + std::lock_guard lock(IDMapMutex); + clientIDMap.erase(clientSocket); + } +} + +// Implementation according to the CAN BUS +bool ServerConnection::isValidId(uint32_t id) +{ + std::lock_guard lock(IDMapMutex); + return clientIDMap.find(id) == clientIDMap.end(); +} + +// Returns the sockets ID +int ServerConnection::getClientSocketByID(uint32_t destID) +{ + std::lock_guard lock(IDMapMutex); + for (const auto &client : clientIDMap) + if (client.second == destID) + return client.first; + + return -1; +} + +// Sends the message to destination +ErrorCode ServerConnection::sendDestination(const Packet &packet) +{ + int targetSocket = getClientSocketByID(packet.getDestId()); + if (targetSocket == -1) + return ErrorCode::INVALID_CLIENT_ID; + + ssize_t bytesSent = socketInterface->send(targetSocket, &packet, sizeof(Packet), 0); + if (!bytesSent) + return ErrorCode::SEND_FAILED; + + if (bytesSent<0){ + //closeConnection(); + return ErrorCode::CONNECTION_FAILED; + } + + return ErrorCode::SUCCESS; +} + +// Sends the message to all connected processes - broadcast +ErrorCode ServerConnection::sendBroadcast(const Packet &packet) +{ + std::lock_guard lock(socketMutex); + for (int sock : sockets) { + ssize_t bytesSent = socketInterface->send(sock, &packet, sizeof(Packet), 0); + if (bytesSent < sizeof(Packet)) + return ErrorCode::SEND_FAILED; + if (bytesSent<0){ + //closeConnection(); + return ErrorCode::CONNECTION_FAILED; + } + } + + return ErrorCode::SUCCESS; +} + +// Sets the server's port number, throws an exception if the port is invalid. +void ServerConnection::setPort(int port) { + if (port <= 0 || port > 65535) + throw std::invalid_argument("Invalid port number: Port must be between 1 and 65535."); + + this->port = port; +} + +// Sets the callback for receiving data, throws an exception if the callback is null. +void ServerConnection::setReceiveDataCallback(std::function callback) { + if (!callback) { + throw std::invalid_argument("Invalid callback function: callback cannot be null."); + } + this->receiveDataCallback = callback; +} + +// Sets the socket interface, throws an exception if the socketInterface is null. +void ServerConnection::setSocketInterface(ISocket* socketInterface) { + if (socketInterface == nullptr) { + throw std::invalid_argument("Invalid socket interface: socketInterface cannot be null."); + } + this->socketInterface = socketInterface; +} + +// For testing +int ServerConnection::getServerSocket() +{ + return serverSocket; +} + +int ServerConnection::isRunning() +{ + return running; +} + +std::vector* ServerConnection::getSockets() +{ + return &sockets; +} + +std::mutex* ServerConnection::getSocketMutex() +{ + return &socketMutex; +} + +std::mutex* ServerConnection::getIDMapMutex() +{ + return &IDMapMutex; +} + +std::map* ServerConnection::getClientIDMap() +{ + return &clientIDMap; +} + +void ServerConnection::testHandleClient(int clientSocket) +{ + handleClient(clientSocket); +} + +int ServerConnection::testGetClientSocketByID(uint32_t destID) +{ + return getClientSocketByID(destID); +} + +// Destructor +ServerConnection::~ServerConnection() +{ + stopServer(); + delete socketInterface; +} \ No newline at end of file diff --git a/test/testLogger.cpp b/test/testLogger.cpp new file mode 100644 index 00000000..2e3345e5 --- /dev/null +++ b/test/testLogger.cpp @@ -0,0 +1,51 @@ +#include +#include +#include "logger.h" + +using ::testing::Return; + +class MockLogger : public logger { +public: + MOCK_METHOD(void, logMessage, (LogLevel level, const std::string &message), (override)); +}; + +class LoggerTest : public ::testing::Test { +protected: + LoggerTest() { + logger::initializeLogFile(); + } + + void SetUp() override { + std::remove("test.log"); // Ensure the file does not exist before the test + } + + void TearDown() override { + std::remove("test.log"); // Clean up after the test + } +}; + +TEST_F(LoggerTest, InitializeLogFile) { + std::ifstream logFile(logger::getLogFileName()); + EXPECT_TRUE(logFile.good()) << "Log file was not created: " << logger::getLogFileName(); + logFile.close(); +} + +TEST_F(LoggerTest, LogMessage) { + logger::logMessage(logger::LogLevel::INFO, "Test INFO message"); + logger::logMessage(logger::LogLevel::ERROR, "Test ERROR message"); + + std::ifstream logFile(logger::getLogFileName()); + std::string content((std::istreambuf_iterator(logFile)), std::istreambuf_iterator()); + EXPECT_NE(content.find("Test INFO message"), std::string::npos); + EXPECT_NE(content.find("Test ERROR message"), std::string::npos); +} + +TEST_F(LoggerTest, LogLevelFiltering) { + logger::logMessage(logger::LogLevel::ERROR, "Test ERROR message"); + logger::logMessage(logger::LogLevel::INFO, "Test INFO message"); + + std::ifstream logFile(logger::getLogFileName()); + std::string content((std::istreambuf_iterator(logFile)), std::istreambuf_iterator()); + EXPECT_NE(content.find("Test ERROR message"), std::string::npos); + EXPECT_EQ(content.find("Test INFO message"), std::string::npos) << "INFO message should not be logged with ERROR level"; +} diff --git a/tests/client_test.cpp b/tests/client_test.cpp new file mode 100644 index 00000000..ba57e1c5 --- /dev/null +++ b/tests/client_test.cpp @@ -0,0 +1,203 @@ +#include +#include +#include "../communication/src/client.h" +#include "../communication/sockets/mock_socket.h" +#include "../communication/src/message.h" + +using ::testing::_; +using ::testing::Return; + +class ClientTest : public ::testing::Test { +protected: + MockSocket mockSocket; + Client* client; + + void SetUp() override { + ON_CALL(mockSocket, socket(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault(::testing::Return(1)); + ON_CALL(mockSocket, setsockopt(::testing::_, ::testing::_, ::testing::_, ::testing::NotNull(), ::testing::_)) + .WillByDefault(::testing::Return(0)); + ON_CALL(mockSocket, bind(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault(::testing::Return(0)); + ON_CALL(mockSocket, listen(::testing::_, ::testing::_)) + .WillByDefault(::testing::Return(0)); + ON_CALL(mockSocket, connect(_, _, _)) + .WillByDefault(::testing::Return(0)); + ON_CALL(mockSocket, close(_)) + .WillByDefault(::testing::Return(0)); + + ON_CALL(mockSocket, socket(_, _, _)) + .WillByDefault(Return(1)); + + ON_CALL(mockSocket, setsockopt(_, _, _, _, _)) + .WillByDefault(Return(0)); + + ON_CALL(mockSocket, connect(_, _, _)) + .WillByDefault(Return(0)); + + ON_CALL(mockSocket, send(_, _, _, _)) + .WillByDefault(Return(44)); // ודא ששליחה מחזירה הצלחה + + ON_CALL(mockSocket, close(_)) + .WillByDefault(Return(0)); // close מחזיר הצלחה + + EXPECT_CALL(mockSocket, close(_)).Times(1); // מצפה ש-close תוקרא פעם אחת + + + Packet packet; + ON_CALL(mockSocket, send(_, _, _, _)) + .WillByDefault(::testing::Return(sizeof(Packet))); + + client = new Client([](Packet &){}, &mockSocket); + } + + void TearDown() override { + EXPECT_CALL(mockSocket, close(_)).Times(1); + //delete client; + } +}; + +// Test Constructor +TEST_F(ClientTest, ConstructorInitializesCorrectly) { + EXPECT_FALSE(client->isConnected()); + EXPECT_GT(client->getClientSocket(), 0); +} + +// Test connection success +TEST_F(ClientTest, ConnectToServerSuccess) { + EXPECT_CALL(mockSocket, connect(_, _, _)).WillOnce(Return(0)); + ErrorCode result = client->connectToServer(1); + EXPECT_EQ(result, ErrorCode::SUCCESS); + EXPECT_TRUE(client->isConnected()); +} + +// Test connection failure (connect fails) +TEST_F(ClientTest, ConnectToServerFailureOnConnect) { + EXPECT_CALL(mockSocket, connect(_, _, _)).WillOnce(Return(-1)); + ErrorCode result = client->connectToServer(1); + EXPECT_EQ(result, ErrorCode::CONNECTION_FAILED); + EXPECT_FALSE(client->isConnected()); +} + +// Test connection failure (socket creation fails) +TEST_F(ClientTest, ConnectToServerSocketFailure) { + EXPECT_CALL(mockSocket, socket(_, _, _)).WillOnce(Return(-1)); + ErrorCode result = client->connectToServer(1); + EXPECT_EQ(result, ErrorCode::SOCKET_FAILED); + EXPECT_FALSE(client->isConnected()); +} + +// Test sendPacket success +TEST_F(ClientTest, SendPacketSuccess) { + Packet packet; + client->connectToServer(1); + ErrorCode result = client->sendPacket(packet); + EXPECT_EQ(result, ErrorCode::SUCCESS); +} + +// Test sendPacket failure - not connected +TEST_F(ClientTest, SendPacketFailureNotConnected) { + Packet packet; + ErrorCode result = client->sendPacket(packet); + EXPECT_EQ(result, ErrorCode::CONNECTION_FAILED); +} + +// Test sendPacket failure - socket send fails +TEST_F(ClientTest, SendPacketFailureOnSend) { + Packet packet; + client->connectToServer(1); + EXPECT_CALL(mockSocket, send(_, _, _, _)).WillOnce(Return(-1)); + ErrorCode result = client->sendPacket(packet); + EXPECT_EQ(result, ErrorCode::SEND_FAILED); +} + +// Test sendPacket partial send +TEST_F(ClientTest, SendPacketPartialSend) { + Packet packet; + client->connectToServer(1); + EXPECT_CALL(mockSocket, send(_, _, _, _)).WillOnce(Return(sizeof(Packet) - 1)); + ErrorCode result = client->sendPacket(packet); + EXPECT_EQ(result, ErrorCode::SEND_FAILED); +} + +// Test receivePacket success +TEST_F(ClientTest, ReceivePacketSuccess) { + Packet packet; + EXPECT_CALL(mockSocket, recv(_, _, _, _)).WillOnce(Return(sizeof(Packet))); + client->connectToServer(1); + std::thread receiveThread(&Client::receivePacket, client); + receiveThread.join(); + // If passPacketCom works, this could be tested, but for now we'll assume correct handling +} + +// Test receivePacket failure (recv fails) +TEST_F(ClientTest, ReceivePacketFailure) { + EXPECT_CALL(mockSocket, recv(_, _, _, _)).WillOnce(Return(-1)); + client->connectToServer(1); + std::thread receiveThread(&Client::receivePacket, client); + receiveThread.join(); + EXPECT_FALSE(client->isConnected()); +} + +// Test receivePacket no data (recv returns 0) +TEST_F(ClientTest, ReceivePacketNoData) { + EXPECT_CALL(mockSocket, recv(_, _, _, _)).WillOnce(Return(0)); + client->connectToServer(1); + std::thread receiveThread(&Client::receivePacket, client); + receiveThread.join(); + EXPECT_TRUE(client->isConnected()); // Should still be connected, no data means wait for more +} + +// Test closeConnection success +TEST_F(ClientTest, CloseConnectionSuccess) { + client->connectToServer(1); + ErrorCode result = client->closeConnection(); + EXPECT_EQ(result, ErrorCode::SUCCESS); + EXPECT_FALSE(client->isConnected()); +} + +// Test closeConnection failure +TEST_F(ClientTest, CloseConnectionFailure) { + EXPECT_CALL(mockSocket, close(_)).WillOnce(Return(-1)); + ErrorCode result = client->closeConnection(); + EXPECT_EQ(result, ErrorCode::CLOSE_FAILED); +} + +// Test setCallback throws on null function +TEST_F(ClientTest, SetCallbackThrowsOnNull) { + EXPECT_THROW(client->setCallback(nullptr), std::invalid_argument); +} + +// Test setSocketInterface throws on null +TEST_F(ClientTest, SetSocketInterfaceThrowsOnNull) { + EXPECT_THROW(client->setSocketInterface(nullptr), std::invalid_argument); +} + +// Test valid socket interface setting +TEST_F(ClientTest, SetSocketInterfaceSuccess) { + MockSocket anotherMockSocket; + EXPECT_NO_THROW(client->setSocketInterface(&anotherMockSocket)); +} + +// Test connection thread starts correctly +TEST_F(ClientTest, ConnectionStartsReceiveThread) { + client->connectToServer(1); + EXPECT_TRUE(client->isReceiveThreadRunning()); +} + +// Test destructor closes connection +TEST_F(ClientTest, DestructorClosesConnection) { + client->connectToServer(1); + delete client; // Should close connection in destructor + EXPECT_CALL(mockSocket, close(_)).Times(1); // Ensure close is called +} + +// Test connection failure after sending a partial packet +TEST_F(ClientTest, SendPacketPartialConnectionClose) { + Packet packet; + client->connectToServer(1); + EXPECT_CALL(mockSocket, send(_, _, _, _)).WillOnce(Return(sizeof(Packet) - 1)); + ErrorCode result = client->sendPacket(packet); + EXPECT_EQ(result, ErrorCode::SEND_FAILED); + EXPECT_FALSE(client->isConnected()); // Should close connection after failure +} diff --git a/tests/server_test.cpp b/tests/server_test.cpp new file mode 100644 index 00000000..61e7c5e5 --- /dev/null +++ b/tests/server_test.cpp @@ -0,0 +1,261 @@ +#include +#include +#include "../communication/src/server.h" +#include "../communication/sockets/mock_socket.h" +#include "../communication/src/message.h" + +using ::testing::_; +using ::testing::Return; + +class ServerTest : public ::testing::Test { +protected: + MockSocket* mockSocket; + Server* server; + int testPort = 8080; + Packet testPacket; + + void SetUp() override { + mockSocket = new MockSocket(); + server = new Server(testPort, [](Packet& packet) {}, mockSocket); + } + + void TearDown() override { + delete server; + } +}; + +// Test for successful startConnection +TEST_F(ServerTest, StartConnection_Success) { + EXPECT_CALL(*mockSocket, socket(AF_INET, SOCK_STREAM, 0)) + .WillOnce(Return(3)); // Return a valid socket fd + + EXPECT_CALL(*mockSocket, setsockopt(3, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, _, sizeof(int))) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockSocket, bind(3, _, sizeof(sockaddr_in))) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockSocket, listen(3, 5)) + .WillOnce(Return(0)); + + ErrorCode result = server->startConnection(); + EXPECT_EQ(result, ErrorCode::SUCCESS); +} + +// Test for socket creation failure +TEST_F(ServerTest, StartConnection_SocketFailed) { + EXPECT_CALL(*mockSocket, socket(AF_INET, SOCK_STREAM, 0)) + .WillOnce(Return(-1)); // Simulate socket creation failure + + ErrorCode result = server->startConnection(); + EXPECT_EQ(result, ErrorCode::SOCKET_FAILED); +} + +// Test for bind failure +TEST_F(ServerTest, StartConnection_BindFailed) { + EXPECT_CALL(*mockSocket, socket(AF_INET, SOCK_STREAM, 0)) + .WillOnce(Return(3)); // Return a valid socket fd + + EXPECT_CALL(*mockSocket, setsockopt(3, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, _, sizeof(int))) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockSocket, bind(3, _, sizeof(sockaddr_in))) + .WillOnce(Return(-1)); // Simulate bind failure + + + EXPECT_CALL(*mockSocket, close(3)).Times(1); // Expect close call due to failure + + ErrorCode result = server->startConnection(); + EXPECT_EQ(result, ErrorCode::BIND_FAILED); +} + +// Test for listen failure +TEST_F(ServerTest, StartConnection_ListenFailed) { + EXPECT_CALL(*mockSocket, socket(AF_INET, SOCK_STREAM, 0)) + .WillOnce(Return(3)); // Return a valid socket fd + + EXPECT_CALL(*mockSocket, setsockopt(3, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, _, sizeof(int))) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockSocket, bind(3, _, sizeof(sockaddr_in))) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockSocket, listen(3, 5)) + .WillOnce(Return(-1)); // Simulate listen failure + + EXPECT_CALL(*mockSocket, close(3)).Times(1); // Expect close call due to failure + + ErrorCode result = server->startConnection(); + EXPECT_EQ(result, ErrorCode::LISTEN_FAILED); +} + +// Test for successful sendDestination +TEST_F(ServerTest, SendDestination_Success) { + int clientSocket = 5; + testPacket.header.DestID = 1; + + // Simulate successful send operation + EXPECT_CALL(*mockSocket, send(clientSocket, &testPacket, sizeof(Packet), 0)) + .WillOnce(Return(sizeof(Packet))); + + { + std::lock_guard lock(*server->getIDMapMutex()); + (*server->getClientIDMap())[clientSocket] = testPacket.header.DestID; + } + + ErrorCode result = server->sendDestination(testPacket); + EXPECT_EQ(result, ErrorCode::SUCCESS); +} + +// Test for sendDestination failure +TEST_F(ServerTest, SendDestination_Failed) { + int clientSocket = 5; + testPacket.header.DestID = 1; + + // Simulate send failure + EXPECT_CALL(*mockSocket, send(clientSocket, &testPacket, sizeof(Packet), 0)) + .WillOnce(Return(0)); + + { + std::lock_guard lock(*server->getIDMapMutex()); + (*server->getClientIDMap())[clientSocket] = testPacket.header.DestID; + } + + ErrorCode result = server->sendDestination(testPacket); + EXPECT_EQ(result, ErrorCode::SEND_FAILED); +} + +// Test for sendDestination failure +TEST_F(ServerTest, SendDestination_Connection_Failed) { + int clientSocket = 5; + testPacket.header.DestID = 1; + + // Simulate send failure + EXPECT_CALL(*mockSocket, send(clientSocket, &testPacket, sizeof(Packet), 0)) + .WillOnce(Return(-1)); + + { + std::lock_guard lock(*server->getIDMapMutex()); + (*server->getClientIDMap())[clientSocket] = testPacket.header.DestID; + } + + ErrorCode result = server->sendDestination(testPacket); + EXPECT_EQ(result, ErrorCode::CONNECTION_FAILED); +} + +// Test for sending message to an invalid client ID +TEST_F(ServerTest, SendDestination_InvalidClientID) { + testPacket.header.DestID = 999; // Invalid client ID + ErrorCode result = server->sendDestination(testPacket); + EXPECT_EQ(result, ErrorCode::INVALID_CLIENT_ID); // Ensure failure for invalid client ID +} + +// Test for send failure when sending to a client +TEST_F(ServerTest, SendDestination_SendFailed) { + testPacket.header.DestID = 1; // Valid client ID + int clientSocket = 3; + EXPECT_CALL(*mockSocket, send(clientSocket, _, sizeof(Packet), 0)) + .WillOnce(Return(0)); // Simulate send failure + + // EXPECT_CALL(*mockSocket, close(clientSocket)); // Close socket on failure + + { + std::lock_guard lock(*server->getIDMapMutex()); + (*server->getClientIDMap())[clientSocket] = testPacket.header.DestID; // Map client to ID + } + + ErrorCode result = server->sendDestination(testPacket); + EXPECT_EQ(result, ErrorCode::SEND_FAILED); // Ensure correct error code on send failure +} + +// Test for closing an invalid socket +TEST_F(ServerTest, CloseInvalidSocket) { + int invalidSocket = -1; + + EXPECT_CALL(*mockSocket, close(invalidSocket)) + .Times(0); // No close call since socket is invalid + + server->stopServer(); + EXPECT_EQ(server->testGetClientSocketByID(999), -1); // Ensure client does not exist +} + +// Test for successful broadcast +TEST_F(ServerTest, SendBroadcast_Success) { + int clientSocket1 = 3; + int clientSocket2 = 4; + + // Simulate successful broadcast to two clients + EXPECT_CALL(*mockSocket, send(clientSocket1, _, sizeof(Packet), 0)) + .WillOnce(Return(sizeof(Packet))); // Success for client 1 + + EXPECT_CALL(*mockSocket, send(clientSocket2, _, sizeof(Packet), 0)) + .WillOnce(Return(sizeof(Packet))); // Success for client 2 + + { + std::lock_guard lock(*server->getSocketMutex()); + (*server->getSockets()).push_back(clientSocket1); // Add client 1 + (*server->getSockets()).push_back(clientSocket2); // Add client 2 + } + + ErrorCode result = server->sendBroadcast(testPacket); + EXPECT_EQ(result, ErrorCode::SUCCESS); // Ensure broadcast was successful +} + +// Test for broadcast failure +TEST_F(ServerTest, SendBroadcast_SendFailed) { + int clientSocket1 = 3; + int clientSocket2 = 4; + + EXPECT_CALL(*mockSocket, send(clientSocket1, _, sizeof(Packet), 0)) + .WillOnce(Return(sizeof(Packet))); // Success for client 1 + + EXPECT_CALL(*mockSocket, send(clientSocket2, _, sizeof(Packet), 0)) + .WillOnce(Return(-1)); // Failure for client 2 + + //EXPECT_CALL(*mockSocket, close(clientSocket2)); // Close socket on failure + + { + std::lock_guard lock(*server->getSocketMutex()); + (*server->getSockets()).push_back(clientSocket1); // Add client 1 + (*server->getSockets()).push_back(clientSocket2); // Add client 2 + } + + ErrorCode result = server->sendBroadcast(testPacket); + EXPECT_EQ(result, ErrorCode::CONNECTION_FAILED); // Ensure broadcast failure is handled +} + +// Test for handling client disconnection during message reception +TEST_F(ServerTest, HandleClient_Disconnection) { + int clientSocket = 5; + + EXPECT_CALL(*mockSocket, recv(clientSocket, _, sizeof(Packet), 0)) + .WillOnce(Return(0)); // Simulate client disconnection + + //EXPECT_CALL(*mockSocket, close(clientSocket)); // Close socket for disconnected client + + server->testHandleClient(clientSocket); + std::vector* sockets = server->getSockets(); + + { + std::lock_guard lock(*server->getSocketMutex()); + EXPECT_EQ(std::find(sockets->begin(), sockets->end(), clientSocket), sockets->end()); + } +} + +// Test for receive failure from a client +TEST_F(ServerTest, HandleClient_ReceiveFailed) { + int clientSocket = 5; + + EXPECT_CALL(*mockSocket, recv(clientSocket, _, sizeof(Packet), 0)) + .WillOnce(Return(-1)); // Simulate receive failure + + //EXPECT_CALL(*mockSocket, close(clientSocket)); // Close socket on failure + + server->testHandleClient(clientSocket); + + std::vector* sockets = server->getSockets(); + { + std::lock_guard lock(*server->getSocketMutex()); + EXPECT_EQ(std::find(sockets->begin(), sockets->end(), clientSocket), sockets->end()); + } +} \ No newline at end of file