diff --git a/include/upipe-modules/upipe_http_source.h b/include/upipe-modules/upipe_http_source.h index ddb61a8d5..eaff68d63 100644 --- a/include/upipe-modules/upipe_http_source.h +++ b/include/upipe-modules/upipe_http_source.h @@ -38,6 +38,61 @@ extern "C" { #define UPIPE_HTTP_SRC_SIGNATURE UBASE_FOURCC('h','t','t','p') +/** @This extends upipe_command with specific commands for http source. */ +enum upipe_http_src_command { + UPIPE_HTTP_SRC_SENTINEL = UPIPE_CONTROL_LOCAL, + + /** returns the reading position of the current http request, in octets + * (uint64_t *) */ + UPIPE_HTTP_SRC_GET_POSITION, + /** asks to get at the given position (uint64_t), using Range header */ + UPIPE_HTTP_SRC_SET_POSITION, + /** asks to get at the given position (uint64_t), the given size + * (uint64_t), using Range header */ + UPIPE_HTTP_SRC_SET_RANGE, +}; + +/** @This returns the reading position of the current http request. + * + * @param upipe description structure of the pipe + * @param position_p filled in with the reading position, in octets + * @return an error code + */ +static inline int upipe_http_src_get_position(struct upipe *upipe, + uint64_t *position_p) +{ + return upipe_control(upipe, UPIPE_HTTP_SRC_GET_POSITION, + UPIPE_HTTP_SRC_SIGNATURE, position_p); +} + +/** @This request the given position using Range header + * + * @param upipe description structure of the pipe + * @param position new reading position, in octets (between 0 and the size) + * @return an error code + */ +static inline int upipe_http_src_set_position(struct upipe *upipe, + uint64_t position) +{ + return upipe_control(upipe, UPIPE_HTTP_SRC_SET_POSITION, + UPIPE_HTTP_SRC_SIGNATURE, position); +} + +/** @This request the given range + * + * @param upipe description structure of the pipe + * @param offset range starts at offset, in octets + * @param length octets to read from offset, in octets + * @return an error code + */ +static inline int upipe_http_src_set_range(struct upipe *upipe, + uint64_t offset, + uint64_t length) +{ + return upipe_control(upipe, UPIPE_HTTP_SRC_SET_RANGE, + UPIPE_HTTP_SRC_SIGNATURE, offset, length); +} + /** @This returns the management structure for all http sources. * * @return pointer to manager diff --git a/lib/upipe-modules/upipe_http_source.c b/lib/upipe-modules/upipe_http_source.c index f735b9020..2cbfb1793 100644 --- a/lib/upipe-modules/upipe_http_source.c +++ b/lib/upipe-modules/upipe_http_source.c @@ -27,6 +27,7 @@ * @short Upipe source module for http GET requests */ +#include #include #include #include @@ -70,15 +71,28 @@ #define UBUF_DEFAULT_SIZE 4096 #define MAX_URL_SIZE 2048 +#define HTTP_VERSION "HTTP/1.0" #define USER_AGENT "upipe_http_src" -static const char get_request_format[] = - "GET %s HTTP/1.0\n" - "User-Agent: %s\n" - "\n"; + +struct http_range { + uint64_t offset; + uint64_t length; +}; + +#define HTTP_RANGE(Offset, Length) \ + (struct http_range){ .offset = Offset, .length = Length } /** @hidden */ static int upipe_http_src_check(struct upipe *upipe, struct uref *flow_format); +struct header { + const char *value; + size_t len; +}; + +#define HEADER(Value, Len) \ + (struct header){ .value = Value, .len = Len } + /** @internal @This is the private context of a http source pipe. */ struct upipe_http_src { /** refcount management structure */ @@ -116,11 +130,28 @@ struct upipe_http_src { struct upump *upump; /** read size */ unsigned int output_size; + /** write watcher */ + struct upump *upump_write; /** socket descriptor */ int fd; + /** a request is pending */ + bool request_pending; /** http url */ char *url; + /** host */ + char *host; + /** path part of the url */ + const char *path; + + struct header header_field; + + /** header location for 302 location */ + char *location; + + /** range */ + struct http_range range; + uint64_t position; /** http parser*/ http_parser parser; @@ -152,6 +183,7 @@ UPIPE_HELPER_UCLOCK(upipe_http_src, uclock, uclock_request, upipe_http_src_check UPIPE_HELPER_UPUMP_MGR(upipe_http_src, upump_mgr) UPIPE_HELPER_UPUMP(upipe_http_src, upump, upump_mgr) UPIPE_HELPER_OUTPUT_SIZE(upipe_http_src, output_size) +UPIPE_HELPER_UPUMP(upipe_http_src, upump_write, upump_mgr) /** @internal @This allocates a http source pipe. * @@ -167,21 +199,110 @@ static struct upipe *upipe_http_src_alloc(struct upipe_mgr *mgr, { struct upipe *upipe = upipe_http_src_alloc_void(mgr, uprobe, signature, args); - struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); upipe_http_src_init_urefcount(upipe); upipe_http_src_init_uref_mgr(upipe); upipe_http_src_init_ubuf_mgr(upipe); upipe_http_src_init_output(upipe); upipe_http_src_init_upump_mgr(upipe); upipe_http_src_init_upump(upipe); + upipe_http_src_init_upump_write(upipe); upipe_http_src_init_uclock(upipe); upipe_http_src_init_output_size(upipe, UBUF_DEFAULT_SIZE); + + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); upipe_http_src->fd = -1; + upipe_http_src->request_pending = false; upipe_http_src->url = NULL; + upipe_http_src->range = HTTP_RANGE(0, -1); + upipe_http_src->position = 0; + upipe_http_src->path = NULL; + upipe_http_src->host = NULL; + upipe_http_src->location = NULL; + upipe_http_src->header_field = HEADER(NULL, 0); + upipe_throw_ready(upipe); return upipe; } +/** @This frees a upipe. + * + * @param upipe description structure of the pipe + */ +struct part { + const char *at; + size_t len; +}; + +#define PART(At, Len) (struct part){ .at = At, .len = Len } +#define PART_NULL PART(NULL, 0) +#define PART_IS_NULL(Part) ((Part).at == NULL) + +static struct part part_make(const char *value, size_t len) +{ + return (struct part){ .at = value, .len = len}; +} + +static struct part part_while(const struct part *value, + const char *contains) +{ + struct part part = PART_NULL; + + for (size_t i = 0; i < value->len && value->at[i] != '\0'; i++) { + size_t j; + for (j = 0; contains[j]; j++) + if (contains[j] == value->at[i]) + break; + + if (!contains[j]) + return part; + + part.at = value->at; + part.len++; + } + + return part; +} + +static struct part part_remove_while(const struct part *value, + const char *contains) +{ + struct part remove = part_while(value, contains); + return PART(value->at + remove.len, value->len + remove.len); +} + +static struct part part_until(const struct part *value, + const char *except) +{ + struct part part = PART_NULL; + + for (size_t i = 0; i < value->len && value->at[i] != '\0'; i++) { + for (size_t j = 0; except[j]; j++) + if (except[j] == value->at[i]) + return part; + + part.at = value->at; + part.len++; + } + + return part; +} + +static struct part part_name(const struct part *value) +{ + struct part cleaned = part_remove_while(value, " "); + struct part pair = part_until(&cleaned, ";"); + if (PART_IS_NULL(pair)) + return pair; + return part_until(&pair, "="); +} + +static int part_cmp(const struct part *a, const struct part *b) +{ + if (a->len != b->len) + return a->len > b->len ? 1 : -1; + return strncmp(a->at, b->at, a->len); +} + /** @internal @This retrieves the upipe_http_src structure from parser * @param parser http parser structure * @return pointer to upipe_http_src private structure @@ -191,6 +312,66 @@ static inline struct upipe_http_src *upipe_http_src_from_parser(http_parser *par return container_of(parser, struct upipe_http_src, parser); } +static int upipe_http_src_header_field(http_parser *parser, + const char *at, + size_t len) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_parser(parser); + upipe_http_src->header_field = HEADER(at, len); + return 0; +} + +static int upipe_http_src_header_value(http_parser *parser, + const char *at, + size_t len) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_parser(parser); + struct upipe *upipe = upipe_http_src_to_upipe(upipe_http_src); + + struct header field = upipe_http_src->header_field; + upipe_http_src->header_field = HEADER(NULL, 0); + assert(field.value != NULL); + + upipe_verbose_va(upipe, "%.*s: %.*s", field.len, field.value, len, at); + if (!strncasecmp("Location", field.value, field.len)) { + upipe_http_src->location = strndup(at, len); + } + return 0; +} + +static int upipe_http_src_status_cb(http_parser *parser) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_parser(parser); + struct upipe *upipe = upipe_http_src_to_upipe(upipe_http_src); + + upipe_dbg_va(upipe, "reply http code %i", parser->status_code); + + switch (parser->status_code) { + /* success */ + case 200: + /* found */ + case 302: + break; + default: + return -1; + } + return 0; +} + +static int upipe_http_src_message_complete(http_parser *parser) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_parser(parser); + struct upipe *upipe = upipe_http_src_to_upipe(upipe_http_src); + + switch (parser->status_code) { + case 302: + upipe_notice_va(upipe, "redirect to %s", upipe_http_src->location); + upipe_set_uri(upipe, upipe_http_src->location); + return 0; + } + return 0; +} + /** @internal @This is called by http_parser when receiving fragments of body * @param parser http parser structure * @param at data buffer @@ -229,6 +410,7 @@ static int upipe_http_src_body_cb(http_parser *parser, const char *at, size_t le uref_clock_set_cr_sys(uref, systime); upipe_use(upipe); upipe_http_src_output(upipe, uref, &upipe_http_src->upump); + upipe_http_src->position += len; upipe_release(upipe); /* everything's fine, return 0 to http_parser */ @@ -283,11 +465,126 @@ static void upipe_http_src_worker(struct upump *upump) } /* parse response */ - http_parser_execute(&upipe_http_src->parser, - &upipe_http_src->parser_settings, buffer, len); + size_t parsed_len = + http_parser_execute(&upipe_http_src->parser, + &upipe_http_src->parser_settings, + buffer, len); + if (parsed_len != len) + upipe_warn(upipe, "http request execution failed"); free(buffer); } +static int request_add(char **req_p, size_t *len, const char *fmt, ...) +{ + va_list args; + + if (!*req_p) + return -1; + + va_start(args, fmt); + int ret = vsnprintf(*req_p, *len, fmt, args); + va_end(args); + + if (ret < 0 || (size_t)ret >= *len) { + *req_p = NULL; + return -1; + } + + *len -= ret; + *req_p += ret; + return 0; +} + +/** @internal @This builds and sends a GET request + * + * @param upipe description structure of the pipe + * @return an error code + */ +static int upipe_http_src_send_request(struct upipe *upipe) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); + const char *url = upipe_http_src->path; + char req_buffer[4096 + strlen(url)]; + char *req = req_buffer; + size_t req_buffer_len = sizeof (req_buffer); + size_t req_len = req_buffer_len; + + /* build get request */ + upipe_dbg_va(upipe, "GET %s", url); + request_add(&req, &req_len, "GET %s %s\r\n", url, HTTP_VERSION); + upipe_verbose_va(upipe, "User-Agent: %s", USER_AGENT); + request_add(&req, &req_len, "User-Agent: %s\r\n", USER_AGENT); + if (upipe_http_src->host) { + upipe_verbose_va(upipe, "Host: %s", upipe_http_src->host); + request_add(&req, &req_len, "Host: %s\r\n", upipe_http_src->host); + } + upipe_http_src->position = 0; + if (upipe_http_src->range.offset || + upipe_http_src->range.length != (uint64_t)-1) { + + if (upipe_http_src->range.offset) { + upipe_verbose_va(upipe, "range offset: %"PRIu64, + upipe_http_src->range.offset); + request_add(&req, &req_len, "Range: bytes=%"PRIu64"-", + upipe_http_src->range.offset); + upipe_http_src->position = upipe_http_src->range.offset; + } + else + request_add(&req, &req_len, "Range: bytes=0-"); + + if (upipe_http_src->range.length != (uint64_t)-1) { + upipe_verbose_va(upipe, "range length: %"PRIu64, + upipe_http_src->range.length); + request_add(&req, &req_len, "%"PRIu64, + upipe_http_src->range.offset + + upipe_http_src->range.length); + } + + request_add(&req, &req_len, "\r\n"); + } + request_add(&req, &req_len, "\r\n"); + + if (unlikely(req == NULL)) { + upipe_err_va(upipe, "request is too long: %s", req_buffer); + return UBASE_ERR_ALLOC; + } + + int ret = send(upipe_http_src->fd, req_buffer, req_buffer_len - req_len, 0); + if (ret < 0) { + switch(errno) { + case EINTR: + case EAGAIN: +#if EAGAIN != EWOULDBLOCK + case EWOULDBLOCK: +#endif + /* try again later */ + return UBASE_ERR_EXTERNAL; + + case EBADF: + case EINVAL: + default: + upipe_err_va(upipe, "error sending request (%s)", strerror(errno)); + return UBASE_ERR_EXTERNAL; + } + } + + return UBASE_ERR_NONE; +} + +static void upipe_http_src_worker_write(struct upump *upump) +{ + struct upipe *upipe = upump_get_opaque(upump, struct upipe *); + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); + + if (unlikely(upipe_http_src_send_request(upipe)) != UBASE_ERR_NONE) { + close(upipe_http_src->fd); + upipe_http_src->fd = -1; + } + + upipe_http_src->request_pending = false; + upipe_http_src_set_upump_write(upipe, NULL); +} + /** @internal @This checks if the pump may be allocated. * * @param upipe description structure of the pipe @@ -326,17 +623,33 @@ static int upipe_http_src_check(struct upipe *upipe, struct uref *flow_format) != NULL) return UBASE_ERR_NONE; - if (upipe_http_src->fd != -1 && upipe_http_src->upump == NULL) { - struct upump *upump; - upump = upump_alloc_fd_read(upipe_http_src->upump_mgr, - upipe_http_src_worker, upipe, - upipe_http_src->fd); - if (unlikely(upump == NULL)) { - upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); - return UBASE_ERR_UPUMP; + if (upipe_http_src->fd != -1) { + if (upipe_http_src->upump == NULL) { + struct upump *upump; + upump = upump_alloc_fd_read(upipe_http_src->upump_mgr, + upipe_http_src_worker, upipe, + upipe_http_src->fd); + if (unlikely(upump == NULL)) { + upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); + return UBASE_ERR_UPUMP; + } + upipe_http_src_set_upump(upipe, upump); + upump_start(upump); + } + + if (upipe_http_src->upump_write == NULL && + upipe_http_src->request_pending) { + struct upump *upump = + upump_alloc_fd_write(upipe_http_src->upump_mgr, + upipe_http_src_worker_write, + upipe, upipe_http_src->fd); + if (unlikely(upump == NULL)) { + upipe_throw_fatal(upipe, UBASE_ERR_UPUMP); + return UBASE_ERR_UPUMP; + } + upipe_http_src_set_upump_write(upipe, upump); + upump_start(upump); } - upipe_http_src_set_upump(upipe, upump); - upump_start(upump); } return UBASE_ERR_NONE; } @@ -361,16 +674,17 @@ static int upipe_http_src_get_uri(struct upipe *upipe, const char **url_p) * @param url relative or absolute url of the http * @return socket fd or -1 in case of error */ -static int upipe_http_src_open_url(struct upipe *upipe, const char *url) +static int upipe_http_src_open_url(struct upipe *upipe) { struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); http_parser *parser = &upipe_http_src->parser; http_parser_settings *settings = &upipe_http_src->parser_settings; struct http_parser_url parsed_url; + const char *url = upipe_http_src->url; struct addrinfo *info = NULL, *res; struct addrinfo hints; - char *host, *service; + char *service; int ret, fd = -1; /* check url size */ @@ -382,12 +696,13 @@ static int upipe_http_src_open_url(struct upipe *upipe, const char *url) /* init parser and settings */ http_parser_init(parser, HTTP_RESPONSE); settings->on_message_begin = NULL; - settings->on_url = NULL; - settings->on_header_field = NULL; - settings->on_header_value = NULL; - settings->on_headers_complete = NULL; - settings->on_body = upipe_http_src_body_cb; - settings->on_message_complete = NULL; + settings->on_url = NULL; + settings->on_header_field = upipe_http_src_header_field; + settings->on_header_value = upipe_http_src_header_value; + settings->on_headers_complete = NULL; + settings->on_body = upipe_http_src_body_cb; + settings->on_message_complete = upipe_http_src_message_complete; + settings->on_status_complete = upipe_http_src_status_cb; /* parse url */ ret = http_parser_parse_url(url, strlen(url), 0, &parsed_url); @@ -397,8 +712,8 @@ static int upipe_http_src_open_url(struct upipe *upipe, const char *url) if (unlikely(!(parsed_url.field_set & (1 << UF_HOST)))) { return -1; } - host = strndup(url + parsed_url.field_data[UF_HOST].off, - parsed_url.field_data[UF_HOST].len); + upipe_http_src->host = strndup(url + parsed_url.field_data[UF_HOST].off, + parsed_url.field_data[UF_HOST].len); if (parsed_url.field_set & (1 <path = url + parsed_url.field_data[UF_PATH].off; /* get socket information */ memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = 0; - ret = getaddrinfo(host, service, &hints, &info); + ret = getaddrinfo(upipe_http_src->host, service, &hints, &info); if (unlikely(ret)) { upipe_err_va(upipe, "%s", gai_strerror(ret)); - free(host); free(service); return -1; } - free(host); free(service); /* connect to first working ressource */ @@ -444,44 +758,6 @@ static int upipe_http_src_open_url(struct upipe *upipe, const char *url) return fd; } -/** @internal @This builds and sends a GET request - * - * @param upipe description structure of the pipe - * @return an error code - */ -static int upipe_http_src_send_request(struct upipe *upipe) -{ - struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); - const char *url = upipe_http_src->url; - int len, ret; - char req[strlen(get_request_format)+strlen(url)+strlen(USER_AGENT)+1]; - - /* build get request */ - len = snprintf(req, sizeof(req), get_request_format, url, USER_AGENT); - - ret = send(upipe_http_src->fd, req, len, 0); - - if (ret < 0) { - switch(errno) { - case EINTR: - case EAGAIN: -#if EAGAIN != EWOULDBLOCK - case EWOULDBLOCK: -#endif - /* try again later */ - return UBASE_ERR_EXTERNAL; - - case EBADF: - case EINVAL: - default: - upipe_err_va(upipe, "error sending request (%s)", strerror(errno)); - return UBASE_ERR_EXTERNAL; - } - } - - return UBASE_ERR_NONE; -} - /** @internal @This asks to open the given http. * * @param upipe description structure of the pipe @@ -498,38 +774,58 @@ static int upipe_http_src_set_uri(struct upipe *upipe, const char *url) close(upipe_http_src->fd); upipe_http_src->fd = -1; } + upipe_http_src->path = NULL; + free(upipe_http_src->host); + upipe_http_src->host = NULL; free(upipe_http_src->url); upipe_http_src->url = NULL; upipe_http_src_set_upump(upipe, NULL); + upipe_http_src->request_pending = false; + upipe_http_src_set_upump_write(upipe, NULL); if (unlikely(url == NULL)) return UBASE_ERR_NONE; + upipe_http_src->url = strdup(url); + if (unlikely(upipe_http_src->url == NULL)) + return UBASE_ERR_ALLOC; + /* now call real code */ - upipe_http_src->fd = upipe_http_src_open_url(upipe ,url); + upipe_http_src->fd = upipe_http_src_open_url(upipe); if (unlikely(upipe_http_src->fd < 0)) { upipe_err_va(upipe, "can't open url %s", url); return UBASE_ERR_EXTERNAL; } - /* keep url in memory */ - upipe_http_src->url = strdup(url); - if (unlikely(upipe_http_src->url == NULL)) { - close(upipe_http_src->fd); - upipe_http_src->fd = -1; - upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); - return UBASE_ERR_ALLOC; - } upipe_notice_va(upipe, "opening url %s", upipe_http_src->url); + upipe_http_src->request_pending = true; + return UBASE_ERR_NONE; +} - int err; - if (unlikely((err = upipe_http_src_send_request(upipe)) != - UBASE_ERR_NONE)) { - /* FIXME: build write pump */ - close(upipe_http_src->fd); - upipe_http_src->fd = -1; - return err; - } +static int _upipe_http_src_get_position(struct upipe *upipe, + uint64_t *position_p) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); + + if (position_p) + *position_p = upipe_http_src->position; + return UBASE_ERR_NONE; +} + +static int _upipe_http_src_set_position(struct upipe *upipe, + uint64_t offset) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); + upipe_http_src->range = HTTP_RANGE(offset, 0); + return UBASE_ERR_NONE; +} + +static int _upipe_http_src_set_range(struct upipe *upipe, + uint64_t offset, + uint64_t length) +{ + struct upipe_http_src *upipe_http_src = upipe_http_src_from_upipe(upipe); + upipe_http_src->range = HTTP_RANGE(offset, length); return UBASE_ERR_NONE; } @@ -582,6 +878,25 @@ static int _upipe_http_src_control(struct upipe *upipe, const char *uri = va_arg(args, const char *); return upipe_http_src_set_uri(upipe, uri); } + + case UPIPE_HTTP_SRC_GET_POSITION: { + UBASE_SIGNATURE_CHECK(args, UPIPE_HTTP_SRC_SIGNATURE) + uint64_t *position_p = va_arg(args, uint64_t *); + return _upipe_http_src_get_position(upipe, position_p); + } + case UPIPE_HTTP_SRC_SET_POSITION: { + UBASE_SIGNATURE_CHECK(args, UPIPE_HTTP_SRC_SIGNATURE) + uint64_t offset = va_arg(args, uint64_t); + return _upipe_http_src_set_position(upipe, offset); + } + + case UPIPE_HTTP_SRC_SET_RANGE: { + UBASE_SIGNATURE_CHECK(args, UPIPE_HTTP_SRC_SIGNATURE) + uint64_t offset = va_arg(args, uint64_t); + int64_t length = va_arg(args, uint64_t); + return _upipe_http_src_set_range(upipe, offset, length); + } + default: return UBASE_ERR_UNHANDLED; } @@ -617,9 +932,11 @@ static void upipe_http_src_free(struct upipe *upipe) } upipe_throw_dead(upipe); + free(upipe_http_src->host); free(upipe_http_src->url); upipe_http_src_clean_output_size(upipe); upipe_http_src_clean_uclock(upipe); + upipe_http_src_clean_upump_write(upipe); upipe_http_src_clean_upump(upipe); upipe_http_src_clean_upump_mgr(upipe); upipe_http_src_clean_output(upipe);