Skip to content

Commit

Permalink
Add socket worker thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
christoph2 committed Feb 24, 2020
1 parent c392d4a commit 7244cb1
Showing 1 changed file with 57 additions and 91 deletions.
148 changes: 57 additions & 91 deletions src/tl/eth/linuxeth.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,18 @@
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>

#if 0
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#endif
#include <pthread.h>

#define err_abort(code,text) do { \
fprintf (stderr, "%s at \"%s\":%d: %s\n", \
text, __FILE__, __LINE__, strerror (code)); \
abort (); \
} while (0)
#define errno_abort(text) do { \
fprintf (stderr, "%s at \"%s\":%d: %s\n", \
text, __FILE__, __LINE__, strerror (errno)); \
abort (); \
} while (0)

#define BUFFER_SIZE 1024

Expand All @@ -80,6 +77,9 @@ typedef struct tagXcpTl_ConnectionType {
} XcpTl_ConnectionType;


static pthread_t worker_thread_id;


unsigned char buf[XCP_COMM_BUFLEN];
int addrSize = sizeof(struct sockaddr_storage);

Expand All @@ -103,8 +103,8 @@ static bool Xcp_DisableSocketOption(int sock, int option);

static int Xcp_SetNonblocking(int fd);
static void Xcp_AddFd(int epoll_fd, int fd);
void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd);

static void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd);
static void * XcpTl_WorkerThread(void * param);

static bool Xcp_EnableSocketOption(int sock, int option)
{
Expand Down Expand Up @@ -160,17 +160,15 @@ static void Xcp_AddFd(int epoll_fd, int fd)
void XcpTl_Init(void)
{
struct addrinfo hints, *addr_info;
char *Address = NULL;
char *Port = DEFAULT_PORT;
int serverSockets[FD_SETSIZE];
int boundSocketNum = -1;
char * address = NULL;
char * port = DEFAULT_PORT;
int sock;
int ret;
int idx;

printf("Tl_Init()\n");

XcpUtl_ZeroMem(&XcpTl_Connection, sizeof(XcpTl_ConnectionType));
XcpUtl_ZeroMem(serverSockets, FD_SETSIZE);
Xcp_PduOut.data = &Xcp_PduOutBuffer[0];
memset(&hints, 0, sizeof(hints));

Expand All @@ -180,82 +178,76 @@ void XcpTl_Init(void)
hints.ai_family = Xcp_Options.ipv6 ? PF_INET6: PF_INET;
hints.ai_socktype = XcpTl_Connection.socketType;
hints.ai_flags = AI_NUMERICHOST | AI_PASSIVE;
ret = getaddrinfo(Address, Port, &hints, &addr_info);

// printf("Port: %s tcp: %u family: %d\n", Port, XcpTl_Connection.socketType, hints.ai_family);
ret = getaddrinfo(address, port, &hints, &addr_info);

if (ret != 0) {
XcpHw_ErrorMsg("XcpTl_Init::getaddrinfo()", errno);
return;
}

serverSockets[0] = socket(addr_info->ai_family, addr_info->ai_socktype, addr_info->ai_protocol);
sock = socket(addr_info->ai_family, addr_info->ai_socktype, addr_info->ai_protocol);
printf("socket()\n");
if (serverSockets[0] == -1){
if (sock == -1){
XcpHw_ErrorMsg("XcpTl_Init::socket()", errno);
return;
}
if (bind(serverSockets[0], addr_info->ai_addr, addr_info->ai_addrlen) == -1) {
if (bind(sock, addr_info->ai_addr, addr_info->ai_addrlen) == -1) {
XcpHw_ErrorMsg("XcpTl_Init::bind()", errno);
return;
}
if (XcpTl_Connection.socketType == SOCK_STREAM) {
if (listen(serverSockets[0], 1) == -1) {
if (listen(sock, 1) == -1) {
XcpHw_ErrorMsg("XcpTl_Init::listen()", errno);
return;
}
}

printf("bound...\n");
boundSocketNum = idx;
XcpTl_Connection.boundSocket = serverSockets[boundSocketNum];
XcpTl_Connection.boundSocket = sock;
freeaddrinfo(addr_info);
if (boundSocketNum == -1) {
fprintf(stderr, "Fatal error: unable to serve on any address.\nPerhaps" \
" a server is already running on port %s / %s [%s]?\n",
DEFAULT_PORT, Xcp_Options.tcp ? "TCP" : "UDP", Xcp_Options.ipv6 ? "IPv6" : "IPv4"
);
exit(2);
}
if (!Xcp_EnableSocketOption(XcpTl_Connection.boundSocket, SO_REUSEADDR)) {
XcpHw_ErrorMsg("XcpTl_Init:setsockopt(SO_REUSEADDR)", errno);
}


epoll_fd = epoll_create(EVENT_TABLE_SIZE);
if (epoll_fd == -1) {
XcpHw_ErrorMsg("XcpTl_Init::epoll_create", errno);
exit(1);
}

Xcp_AddFd(epoll_fd, serverSockets[0]);
Xcp_AddFd(epoll_fd, sock);

while(1)
{
int ret = epoll_wait(epoll_fd, socket_events, MAX_EVENT_NUMBER, -1);
if(ret < 0)
{
ret = pthread_create(&worker_thread_id, NULL, &XcpTl_WorkerThread, NULL);
if (ret != 0) {
err_abort(ret, "Create worker thread");
}

}

static void * XcpTl_WorkerThread(void * param)
{
int status;

while(1) {
status = epoll_wait(epoll_fd, socket_events, MAX_EVENT_NUMBER, -1);
if (status < 0) {
printf("epoll failure!\n");
break;
}

lt_process(socket_events, ret, epoll_fd, serverSockets[0]);

lt_process(socket_events, status, epoll_fd, XcpTl_Connection.boundSocket);
}
return NULL;
}


void XcpTl_DeInit(void)
{
close(epoll_fd);
close(XcpTl_Connection.boundSocket);
}


void XcpTl_MainFunction(void)
{
if (XcpTl_FrameAvailable(0, 1000) > 0) {
XcpTl_RxHandler();
}
}

void *get_in_addr(struct sockaddr *sa)
Expand Down Expand Up @@ -297,10 +289,12 @@ void XcpTl_RxHandler(void)
}
recv_len = recv(XcpTl_Connection.connectedSocket, (char*)buf, XCP_COMM_BUFLEN, 0);
if (recv_len == -1) {
XcpHw_ErrorMsg("XcpTl_RxHandler::recv()", errno);
close(XcpTl_Connection.connectedSocket);
exit(1);
return;
if (errno != EAGAIN) {
XcpHw_ErrorMsg("XcpTl_RxHandler::recv()", errno);
close(XcpTl_Connection.connectedSocket);
exit(1);
return;
}
}
if (recv_len == 0) {
DBG_PRINT1("Client closed connection\n");
Expand Down Expand Up @@ -342,13 +336,13 @@ void XcpTl_RxHandler(void)
}
}

void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd)
static void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd)
{
char buf[BUFFER_SIZE];
int i;
int recv_len;
uint16_t dlc;
int from_len = sizeof(sockaddr_storage);
int from_len = sizeof(struct sockaddr_storage);
char hostname[NI_MAXHOST];

for(i = 0; i < number; i++) //number: number of events ready
Expand All @@ -362,6 +356,7 @@ void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen
Xcp_AddFd(epoll_fd, connfd);
#endif
if (!XcpTl_Connection.connected) {
printf("Try connect\n");
XcpTl_Connection.connectedSocket = accept(XcpTl_Connection.boundSocket, (struct sockaddr *)&XcpTl_Connection.currentAddress, &from_len);
if (XcpTl_Connection.connectedSocket == -1) {
XcpHw_ErrorMsg("XcpTl_RxHandler::accept()", errno);
Expand All @@ -376,15 +371,17 @@ void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen

} else if(events[i].events & EPOLLIN) { //Readable with client data
// This code is triggered as long as the data in the buffer has not been read.This is what LT mode is all about: repeating notifications until processing is complete
printf("lt mode: event trigger once!\n");
//printf("Recv\n");
//printf("lt mode: event trigger once!\n");
memset(buf, 0, BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if(ret <= 0) //After reading the data, remember to turn off fd
{
printf("disco()\n");
close(sockfd);
continue;
}
printf("get %d bytes of content: \n", ret);
// printf("get %d bytes of content: \n", ret);
recv_len = ret;
// hexdump(buf, ret);
if (recv_len > 0) {
Expand Down Expand Up @@ -419,36 +416,6 @@ void XcpTl_TxHandler(void)

}


int16_t XcpTl_FrameAvailable(uint32_t sec, uint32_t usec)
{
struct timeval timeout;
fd_set fds;
int16_t res;

timeout.tv_sec = sec;
timeout.tv_usec = usec;

FD_ZERO(&fds);
FD_SET(XcpTl_Connection.boundSocket, &fds);

// Return value:
// -1: error occurred
// 0: timed out
// > 0: data ready to be read
if (((XcpTl_Connection.socketType == SOCK_STREAM) && (!XcpTl_Connection.connected)) || (XcpTl_Connection.socketType == SOCK_DGRAM)) {
res = select(0, &fds, 0, 0, &timeout);
if (res == -1) {
XcpHw_ErrorMsg("XcpTl_FrameAvailable:select()", errno);
exit(2);
}
return res;
} else {
return 1;
}
}


void XcpTl_Send(uint8_t const * buf, uint16_t len)
{

Expand All @@ -467,7 +434,6 @@ void XcpTl_Send(uint8_t const * buf, uint16_t len)
}
}


void XcpTl_SaveConnection(void)
{
XcpUtl_MemCopy(&XcpTl_Connection.connectionAddress, &XcpTl_Connection.currentAddress, sizeof(struct sockaddr_storage));
Expand Down

0 comments on commit 7244cb1

Please sign in to comment.