Permalink
Browse files

Cores communicating

  • Loading branch information...
1 parent c0c3955 commit 829b08b276af73c6b7b736a3534c7f82e7785449 @Detegr committed Feb 25, 2013
Showing with 49 additions and 16 deletions.
  1. +23 −6 core/core.c
  2. +7 −3 core/event.c
  3. +1 −0 core/event.h
  4. +16 −4 core/peermanager.c
  5. +1 −0 core/peermanager.h
  6. +1 −3 core/pipemanager.c
View
29 core/core.c
@@ -322,17 +322,20 @@ void* connection_thread(void* args)
strncpy(p->addr, hbuf, IPV4_MAX);
p->port=port;
struct peer* pp;
+ /*
while((pp=peer_next()))
{
printf("%s:%u\no:%d\ni:%d\n", pp->addr, pp->port, pp->osock, pp->isock);
}
+ */
if((pp=peer_exists(p)))
{
peer_remove(p);
// Remove the newly allocated peer and check if
// we have oneway connection with the old one
if(check_peer_key(pp, hbuf, newconn, conf) == 0)
{
+ peer_updateset(pp);
#ifndef NDEBUG
printf("Oneway connection to bidirectional for %s:%u\n", hbuf, port);
#endif
@@ -359,7 +362,9 @@ void* connection_thread(void* args)
}
p->osock=socket(AF_INET, SOCK_STREAM, 0);
((struct sockaddr_in*)&addr)->sin_port=htons(p->port);
+#ifndef NDEBUG
printf("Connecting %s:%d\n", hbuf, p->port);
+#endif
if(connect(p->osock, &addr, addrlen))
{
#ifndef NDEBUG
@@ -370,7 +375,10 @@ void* connection_thread(void* args)
// Use special number for outgoing socket
// to tell that we currently have a oneway connection
p->osock=SOCKET_ONEWAY;
- } else printf("Connected successfully!\n");
+ }
+#ifndef NDEBUG
+ else printf("Connected successfully!\n");
+#endif
peer_addtoset(p);
}
}
@@ -413,11 +421,11 @@ void* read_thread(void* args)
size_t datalen;
unsigned char* data=aes_decrypt_with_key(readbuf, b, &deckey, &datalen);
evt_t* e=new_event_fromstr(data);
-#ifndef NDEBUG
- printf("Got %s\n", data);
-#endif
if(e)
{
+#ifndef NDEBUG
+ printf("Got %s\n", e->data);
+#endif
send_event(e);
event_free(e);
}
@@ -486,7 +494,13 @@ void send_to_all(unsigned char* data_to_enc, int len)
while(p=peer_next())
{
int fd;
- if(p->osock==SOCKET_ONEWAY) fd=p->isock;
+ if(p->osock==SOCKET_ONEWAY)
+ {
+#ifndef NDEBUG
+ printf("Sending to oneway connection!\n");
+#endif
+ fd=p->isock;
+ }
else fd=p->osock;
assert(fd >= 0);
assert(fd != SOCKET_ONEWAY);
@@ -516,7 +530,10 @@ void process_event(evt_t* e)
{
case Message:
{
- send_to_all(e->data, strnlen(e->data, EVENT_MAX-EVENT_LEN));
+ char buf[EVENT_MAX];
+ memset(buf, 0, EVENT_MAX);
+ char* end=stpncpy(stpncpy(buf, eventtype_str(e), EVENT_LEN), e->data, e->data_len);
+ send_to_all(buf, end-buf);
break;
}
case ListPeers:
View
10 core/event.c
@@ -7,6 +7,7 @@
void event_init(evt_t* evt, EventType t, const char* data)
{
evt->type=t;
+ evt->data_len=0;
if(data)
{
event_set(evt, data);
@@ -22,7 +23,8 @@ evt_t* new_event_fromstr(const char* str)
{
evt_t* ret=(evt_t*)malloc(sizeof(evt_t));
ret->type=i;
- ret->data=strndup(str+EVENT_LEN, EVENT_MAX-EVENT_LEN);
+ ret->data=strndup(str+EVENT_LEN, EVENT_MAX-EVENT_LEN-1);
+ ret->data_len=strnlen(ret->data, EVENT_MAX-EVENT_LEN-1);
ret->next=NULL;
return ret;
}
@@ -37,7 +39,8 @@ const char* eventtype_str(evt_t* evt)
int event_set(evt_t* evt, const char* data)
{
- evt->data = strndup(data, EVENT_MAX-EVENT_LEN);
+ evt->data = strndup(data, EVENT_MAX-EVENT_LEN-1);
+ evt->data_len = strnlen(evt->data, EVENT_MAX-EVENT_LEN-1);
return evt->data == NULL;
}
@@ -52,6 +55,7 @@ void event_free_s(evt_t* evt)
{
if(evt->data) free(evt->data);
if(evt->next) event_free(evt->next);
+ evt->data_len=0;
}
int event_send(evt_t* evt, int fd)
@@ -61,7 +65,7 @@ int event_send(evt_t* evt, int fd)
char* p=stpncpy(buf, eventtype_str(evt), EVENT_LEN);
if(evt->data)
{
- stpncpy(p, evt->data, strnlen(evt->data, EVENT_MAX-EVENT_LEN));
+ stpncpy(p, evt->data, evt->data_len);
}
if(send(fd, buf, strnlen(buf, EVENT_MAX), 0) < 0) return -1;
return 0;
View
1 core/event.h
@@ -16,6 +16,7 @@ typedef struct event
{
EventType type;
char* data;
+ int data_len;
struct event* next;
} evt_t;
View
20 core/peermanager.c
@@ -73,6 +73,15 @@ int write_max()
return ret;
}
+int peer_updateset(struct peer* p)
+{
+ pthread_mutex_lock(&m_lock);
+ FD_CLR(p->osock, &m_writeset);
+ FD_CLR(p->osock, &m_readset);
+ pthread_mutex_unlock(&m_lock);
+ return peer_addtoset(p);
+}
+
int peer_addtoset(struct peer* p)
{
if(m_initialized)
@@ -82,12 +91,12 @@ int peer_addtoset(struct peer* p)
}
pthread_mutex_lock(&m_lock);
assert(p->isock != p->osock);
- if(p->osock != -1)
+ if(p->osock >= 0)
{
FD_SET(p->osock, &m_writeset);
if(p->osock > write_max_int) write_max_int=p->osock;
}
- else if(p->isock != -1)
+ else if(p->isock >= 0)
{
FD_SET(p->isock, &m_writeset);
if(p->isock > write_max_int) write_max_int=p->isock;
@@ -98,8 +107,11 @@ int peer_addtoset(struct peer* p)
fprintf(stderr, "Peer with no sockets!\n");
return -1;
}
- FD_SET(p->isock, &m_readset);
- if(p->isock > read_max_int) read_max_int=p->isock;
+ if(p->isock >= 0)
+ {
+ FD_SET(p->isock, &m_readset);
+ if(p->isock > read_max_int) read_max_int=p->isock;
+ }
pthread_mutex_unlock(&m_lock);
return 0;
}
View
1 core/peermanager.h
@@ -10,6 +10,7 @@ struct peer* peer_new();
void peer_writeset(fd_set* set);
void peer_readset(fd_set* set);
int peer_addtoset(struct peer* p);
+int peer_updateset(struct peer* p);
void peer_removefromset(struct peer* p);
// Does not do any clever checking, just checks if the
// address and the port of p matches to another peer.
View
4 core/pipemanager.c
@@ -86,9 +86,7 @@ evt_t* poll_event(void)
if(strncmp(buf, eventtypes[j], EVENT_LEN) == 0)
{
cur = (evt_t*)malloc(sizeof(evt_t));
- cur->type=j;
- cur->data=strndup(buf+EVENT_LEN, EVENT_MAX-EVENT_LEN);
- cur->next=NULL;
+ event_init(cur, j, buf+EVENT_LEN);
if(!ret) ret=cur;
else
{

0 comments on commit 829b08b

Please sign in to comment.