Skip to content

Commit

Permalink
firechat
Browse files Browse the repository at this point in the history
(cherry picked from commit 31a4765)
  • Loading branch information
heavyrain2012 committed Nov 14, 2021
1 parent bc9da47 commit 8e3790d
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 15 deletions.
8 changes: 4 additions & 4 deletions conf/janus.transport.mqtt.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ general: {
#events = true # Whether to notify event handlers about transport events (default=true)
json = "indented" # Whether the JSON messages should be indented (default),
# plain (no indentation) or compact (no indentation and no spaces)

im_host = "imdev.wildfirechat.cn" # Wildfire IM server host
im_port = 80 # Wildfire IM server http port

url = "tcp://localhost:1883" # The connection URL of the MQTT broker: if you want
# to use SSL, make sure you type ssl:// instead of tcp://,
# and that you configure the SSL settings below
#mqtt_version = "3.1.1" # Protocol version. Available values: 3.1, 3.1.1 (default), 5.
#client_id = "guest" # Client identifier
#username = "guest" # Username to use to authenticate, if needed
#password = "guest" # Password to use to authenticate, if needed
client_id = "guest" # Client identifier
#keep_alive_interval = 20 # Keep connection for N seconds
#cleansession = 0 # Clean session flag
#max_inflight = 10 # Maximum number of inflight messages
Expand Down
165 changes: 154 additions & 11 deletions transports/janus_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
*/

#include "transport.h"
#include <arpa/inet.h>
#include <assert.h>
#include <netdb.h> /* getprotobyname */
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#include <MQTTAsync.h>

Expand Down Expand Up @@ -60,6 +70,7 @@ void janus_mqtt_session_created(janus_transport_session *transport, guint64 sess
void janus_mqtt_session_over(janus_transport_session *transport, guint64 session_id, gboolean timeout, gboolean claimed);
void janus_mqtt_session_claimed(janus_transport_session *transport, guint64 session_id);
json_t *janus_mqtt_query_transport(json_t *request);
int getNodeHost(const char* host, int port, const char* client, char* content);

#define JANUS_MQTT_VERSION_3_1 "3.1"
#define JANUS_MQTT_VERSION_3_1_1 "3.1.1"
Expand Down Expand Up @@ -128,6 +139,8 @@ static struct janus_json_parameter configure_parameters[] = {
typedef struct janus_mqtt_context {
janus_transport_callbacks *gateway;
MQTTAsync client;
char *im_host;
int im_port;
struct {
int mqtt_version;
int keep_alive_interval;
Expand Down Expand Up @@ -267,6 +280,7 @@ void janus_mqtt_client_publish_status_failure_impl(int rc);
/* We only handle a single client */
static janus_mqtt_context *context_ = NULL;
static janus_transport_session *mqtt_session = NULL;
static char *g_mqtturl = NULL;

#ifdef MQTTVERSION_5
/* MQTT 5 specific statics and functions */
Expand Down Expand Up @@ -324,17 +338,36 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
janus_config_category *config_status = janus_config_get_create(config, NULL, janus_config_type_category, "status");

/* Handle configuration */
janus_config_item *url_item = janus_config_get(config, config_general, janus_config_type_item, "url");
const char *url = g_strdup((url_item && url_item->value) ? url_item->value : "tcp://localhost:1883");
// janus_config_item *url_item = janus_config_get(config, config_general, janus_config_type_item, "url");
// const char *url = g_strdup((url_item && url_item->value) ? url_item->value : "tcp://localhost:1883");

janus_config_item *im_host_item = janus_config_get(config, config_general, janus_config_type_item, "im_host");
ctx->im_host = g_strdup((im_host_item && im_host_item->value) ? im_host_item->value : "localhost");

janus_config_item *im_port_item = janus_config_get(config, config_general, janus_config_type_item, "im_port");
ctx->im_port = (im_port_item && im_port_item->value) ? atoi(im_port_item->value) : 80;

janus_config_item *client_id_item = janus_config_get(config, config_general, janus_config_type_item, "client_id");
const char *client_id = g_strdup((client_id_item && client_id_item->value) ? client_id_item->value : "guest");

janus_config_item *username_item = janus_config_get(config, config_general, janus_config_type_item, "username");
ctx->connect.username = g_strdup((username_item && username_item->value) ? username_item->value : "guest");
// janus_config_item *username_item = janus_config_get(config, config_general, janus_config_type_item, "username");
ctx->connect.username = g_strdup((client_id_item && client_id_item->value) ? client_id_item->value : "guest");

// janus_config_item *password_item = janus_config_get(config, config_general, janus_config_type_item, "password");
ctx->connect.password = g_strdup((client_id_item && client_id_item->value) ? client_id_item->value : "guest");

char node_host[1024];
memset(node_host, 0, sizeof(node_host));
getNodeHost(ctx->im_host, ctx->im_port, ctx->connect.username, node_host);
char urlbuf[1024];
memset(urlbuf, 0, sizeof(urlbuf));
snprintf(urlbuf, 1024, "tcp://%s:1883", node_host);

if(g_mqtturl != NULL) {
g_free(g_mqtturl);
}
g_mqtturl = g_strdup(urlbuf);

janus_config_item *password_item = janus_config_get(config, config_general, janus_config_type_item, "password");
ctx->connect.password = g_strdup((password_item && password_item->value) ? password_item->value : "guest");

janus_config_item *json_item = janus_config_get(config, config_general, janus_config_type_item, "json");
if(json_item && json_item->value) {
Expand Down Expand Up @@ -372,7 +405,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
}
}
if(ssl_item && ssl_item->value && janus_is_true(ssl_item->value)) {
if(strstr(url, "ssl://") != url)
if(strstr(g_mqtturl, "ssl://") != g_mqtturl)
JANUS_LOG(LOG_WARN, "SSL enabled, but MQTT url doesn't start with ssl://...\n");

ctx->ssl_enabled = TRUE;
Expand Down Expand Up @@ -403,7 +436,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
ctx->verify_peer = (verify && verify->value && janus_is_true(verify->value)) ? TRUE : FALSE;
} else {
JANUS_LOG(LOG_INFO, "MQTT SSL support disabled\n");
if(strstr(url, "ssl://") == url)
if(strstr(g_mqtturl, "ssl://") == g_mqtturl)
JANUS_LOG(LOG_WARN, "SSL disabled, but MQTT url starts with ssl:// instead of tcp://...\n");
}

Expand Down Expand Up @@ -696,7 +729,7 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path

if(MQTTAsync_createWithOptions(
&ctx->client,
url,
g_mqtturl,
client_id,
MQTTCLIENT_PERSISTENCE_NONE,
NULL,
Expand Down Expand Up @@ -739,7 +772,6 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
goto error;
}

g_free((char *)url);
g_free((char *)client_id);
janus_config_destroy(config);
return 0;
Expand All @@ -754,7 +786,6 @@ int janus_mqtt_init(janus_transport_callbacks *callback, const char *config_path
#endif
janus_transport_session_destroy(mqtt_session);
janus_mqtt_client_destroy_context(&ctx);
g_free((char *)url);
g_free((char *)client_id);
janus_config_destroy(config);

Expand Down Expand Up @@ -1238,6 +1269,19 @@ void janus_mqtt_client_connect_failure_impl(void *context, int rc) {

/* Notify handlers about this transport failure */
janus_mqtt_context *ctx = (janus_mqtt_context *)context;

char node_host[1024];
memset(node_host, 0, sizeof(node_host));
getNodeHost(ctx->im_host, ctx->im_port, ctx->connect.username, node_host);
char urlbuf[1024];
memset(urlbuf, 0, sizeof(urlbuf));
snprintf(urlbuf, 1024, "tcp://%s:1883", node_host);

if(strcmp(g_mqtturl, urlbuf) != 0) {
JANUS_LOG(LOG_ERR, "MQTT client address changed, need reboot\n");
exit(-1);
}

if(notify_events && ctx && ctx->gateway && ctx->gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "event", json_string("failed"));
Expand Down Expand Up @@ -1746,3 +1790,102 @@ void janus_mqtt_transaction_state_free(gpointer state_ptr) {
g_free(state);
}
#endif

int getNodeHost(const char* host, int port, const char* client, char* content) {
char buffer[BUFSIZ];
char result[1024];
enum CONSTEXPR { MAX_REQUEST_LEN = 1024};
char request[MAX_REQUEST_LEN];
struct protoent *protoent;
in_addr_t in_addr;
int request_len;
int socket_file_descriptor;
ssize_t nbytes_total, nbytes_last;
struct hostent *hostent;
struct sockaddr_in sockaddr_in;
unsigned short server_port = (unsigned short)port;



request_len = snprintf(request, MAX_REQUEST_LEN, "GET /api/node?id=%s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n", client, host);
if (request_len >= MAX_REQUEST_LEN) {
fprintf(stderr, "request length large: %d\n", request_len);
exit(EXIT_FAILURE);
}

/* Build the socket. */
protoent = getprotobyname("tcp");
if (protoent == NULL) {
perror("getprotobyname");
exit(EXIT_FAILURE);
}
socket_file_descriptor = socket(AF_INET, SOCK_STREAM, protoent->p_proto);
if (socket_file_descriptor == -1) {
perror("socket");
exit(EXIT_FAILURE);
}

/* Build the address. */
hostent = gethostbyname(host);
if (hostent == NULL) {
fprintf(stderr, "error: gethostbyname(\"%s\")\n", host);
exit(EXIT_FAILURE);
}
in_addr = inet_addr(inet_ntoa(*(struct in_addr*)*(hostent->h_addr_list)));
if (in_addr == (in_addr_t)-1) {
fprintf(stderr, "error: inet_addr(\"%s\")\n", *(hostent->h_addr_list));
exit(EXIT_FAILURE);
}
sockaddr_in.sin_addr.s_addr = in_addr;
sockaddr_in.sin_family = AF_INET;
sockaddr_in.sin_port = htons(server_port);

/* Actually connect. */
if (connect(socket_file_descriptor, (struct sockaddr*)&sockaddr_in, sizeof(sockaddr_in)) == -1) {
perror("connect");
exit(EXIT_FAILURE);
}

/* Send HTTP request. */
nbytes_total = 0;
while (nbytes_total < request_len) {
nbytes_last = write(socket_file_descriptor, request + nbytes_total, request_len - nbytes_total);
if (nbytes_last == -1) {
perror("write");
exit(EXIT_FAILURE);
}
nbytes_total += nbytes_last;
}

/* Read the response. */
fprintf(stderr, "debug: before first read\n");
memset(result, 0, sizeof(result));
int dataLen = 0;
while ((nbytes_total = read(socket_file_descriptor, buffer, BUFSIZ)) > 0) {
fprintf(stderr, "debug: after a read\n");
write(STDOUT_FILENO, buffer, nbytes_total);
memcpy(result+dataLen, buffer, nbytes_total);
dataLen += nbytes_total;
}
result[dataLen] = 0;
fprintf(stderr, "debug: after last read\n");
fprintf(stderr, "receive msg is %s\n", result);


for(int i = 0; i < dataLen -4; i++) {
if (result[i] == '\r' && result[i+1] == '\n' && result[i+2] == '\r' && result[i+3] == '\n') {
memcpy(content, result+i+4, dataLen - i - 4);
break;
}
}

fprintf(stderr, "receive content is %s\n", content);

if (nbytes_total == -1) {
perror("read");
exit(EXIT_FAILURE);
}

close(socket_file_descriptor);
return 0;
}

0 comments on commit 8e3790d

Please sign in to comment.