Skip to content

Commit

Permalink
Merge branch 'head-body-sep'
Browse files Browse the repository at this point in the history
This introduces the concept of *URI descriptor*, i.e. the data base no longer
points to a head+body blob, but instead to a relatively small document
describing the contents cached, their metadata, data storage links, etc.

At the code level, whenever one fetches "content" from the cache, it is one of
these descriptor what is being retrieved.  Additional methods are added to
retrieve particular data items, which are no longer addressed via URL (since
the descriptor may contain several versions) but via content storage
links (like IPFS').

This code implements a temporary, very simplistic, HTTP-only, single
version-only, IPFS storage-only approach of URI descriptors.  For the target
format, please check `doc/descriptor-*.json` files.

Fixes #8.
  • Loading branch information
ivilata committed Sep 5, 2018
2 parents 6987e1d + 3f863f8 commit 1dc9acf
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 50 deletions.
5 changes: 5 additions & 0 deletions src/cache/cache_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ string CacheClient::ipfs_add(const string& data, asio::yield_context yield)
return _ipfs_node->add(data, yield);
}

string CacheClient::get_data(const string &ipfs_id, asio::yield_context yield)
{
return _ipfs_node->cat(ipfs_id, yield);
}

CachedContent CacheClient::get_content(string url, asio::yield_context yield)
{
return ouinet::get_content(*_db, url, yield);
Expand Down
5 changes: 5 additions & 0 deletions src/cache/cache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class CacheClient {

std::string ipfs_add(const std::string& content, boost::asio::yield_context);

// Gets the data stored in IPFS under `/ipfs/<ipfs_id>`.
//
// TODO: This should accept a generic storage URI instead.
std::string get_data(const std::string& ipfs_id, boost::asio::yield_context);

// Find the content previously stored by the injector under `url`.
// The content is returned in the parameter of the callback function.
//
Expand Down
22 changes: 18 additions & 4 deletions src/cache/cache_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,26 @@ void CacheInjector::insert_content_from_queue()
if (*wd) return;

sys::error_code ec;
Json json;

json["value"] = ipfs_id;
json["ts"] = boost::posix_time::to_iso_extended_string(ts) + 'Z';
if (!key.empty()) { // not a raw data insertion, store in database
Json json;

_db->update(move(key), json.dump(), yield[ec]);
json["value"] = ipfs_id;
json["ts"] = boost::posix_time::to_iso_extended_string(ts) + 'Z';

_db->update(move(key), json.dump(), yield[ec]);
}
cb(ec, ipfs_id);
});
});
}

void CacheInjector::put_data( const string& data
, function<void(sys::error_code, string)> cb)
{
insert_content("", move(data), move(cb));
}

void CacheInjector::insert_content( string key
, const string& value
, function<void(sys::error_code, string)> cb)
Expand Down Expand Up @@ -110,6 +119,11 @@ string CacheInjector::insert_content( string key
return result.get();
}

string CacheInjector::get_data(const string &ipfs_id, asio::yield_context yield)
{
return _ipfs_node->cat(ipfs_id, yield);
}

CachedContent CacheInjector::get_content(string url, asio::yield_context yield)
{
return ouinet::get_content(*_db, url, yield);
Expand Down
14 changes: 14 additions & 0 deletions src/cache/cache_injector.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ class CacheInjector {
// "https://ipfs.io/ipns/" + ipfs.id()
std::string id() const;

// Insert `data` into IPFS and pass the resulting IPFS ID to the callback.
//
// TODO: This should store into a variety of systems
// and pass a set of storage URIs to the callback.
//
// When testing or debugging, the content can be found here:
// "https://ipfs.io/ipfs/" + <IPFS ID>
void put_data(const std::string& data, OnInsert);

// Gets the data stored in IPFS under `/ipfs/<ipfs_id>`.
//
// TODO: This should accept a generic storage URI instead.
std::string get_data(const std::string& ipfs_id, boost::asio::yield_context);

// Insert `content` into IPFS and store its IPFS ID under the `url` in the
// database. The IPFS ID is also returned as a parameter to the callback
// function.
Expand Down
124 changes: 124 additions & 0 deletions src/cache/http_desc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Temporary, simplified URI descriptor format for a single HTTP response.
//
// See `doc/descriptor-*.json` for the target format.
#pragma once

#include <sstream>

#include <boost/format.hpp>
#include <json.hpp>

#include "../namespaces.h"
#include "../or_throw.h"

namespace ouinet {

namespace descriptor {

// For the given HTTP request `rq` and response `rs`, seed body data to the `cache`,
// then create an HTTP descriptor for the URL and response,
// and pass it to the given callback.
template<class Cache>
inline
void
http_create( Cache& cache
, const http::request<http::string_body>& rq
, const http::response<http::dynamic_body>& rs
, std::function<void(sys::error_code, std::string)> cb) {

// TODO: Do it more efficiently?
cache.put_data(beast::buffers_to_string(rs.body().data()),
[rq, rsh = rs.base(), cb = std::move(cb)] (const sys::error_code& ec, auto ipfs_id) {
auto url = rq.target().to_string();
if (ec) {
std::cout << "!Data seeding failed: " << url
<< " " << ec.message() << std::endl;
return cb(ec, "");
}

// Create the descriptor.
// TODO: This is a *temporary format* with the bare minimum to test
// head/body splitting of HTTP responses.
std::stringstream rsh_ss;
rsh_ss << rsh;

nlohmann::json desc;
desc["url"] = url;
desc["head"] = rsh_ss.str();
desc["body_link"] = ipfs_id;

cb(ec, std::move(desc.dump()));
});
}

// For the given HTTP descriptor serialized in `desc_data`,
// retrieve the head from the descriptor and the body data from the `cache`,
// assemble and return the HTTP response.
template<class Cache>
inline
http::response<http::dynamic_body>
http_parse( Cache& cache, const std::string& desc_data
, asio::yield_context yield) {

using Response = http::response<http::dynamic_body>;

sys::error_code ec;
std::string url, head, body_link, body;

// Parse the JSON HTTP descriptor, extract useful info.
try {
auto json = nlohmann::json::parse(desc_data);
url = json["url"];
head = json["head"];
body_link = json["body_link"];
} catch (const std::exception& e) {
std::cerr << "WARNING: Malformed or invalid HTTP descriptor: " << e.what() << std::endl;
std::cerr << "----------------" << std::endl;
std::cerr << desc_data << std::endl;
std::cerr << "----------------" << std::endl;
ec = asio::error::invalid_argument; // though ``bad_descriptor`` would rock
}

if (!ec)
// Get the HTTP response body (stored independently).
body = cache.get_data(body_link, yield[ec]);

if (ec)
return or_throw<Response>(yield, ec);

// Build an HTTP response from the head in the descriptor and the retrieved body.
http::response_parser<Response::body_type> parser;
parser.eager(true);

// - Parse the response head.
parser.put(asio::buffer(head), ec);
if (ec || !parser.is_header_done()) {
std::cerr << "WARNING: Malformed or incomplete HTTP head in descriptor" << std::endl;
std::cerr << "----------------" << std::endl;
std::cerr << head << std::endl;
std::cerr << "----------------" << std::endl;
ec = asio::error::invalid_argument;
return or_throw<Response>(yield, ec);
}

// - Add the response body (if needed).
if (body.length() > 0)
parser.put(asio::buffer(body), ec);
else
parser.put_eof(ec);
if (ec || !parser.is_done()) {
std::cerr
<< (boost::format
("WARNING: Incomplete HTTP body in cache (%1% out of %2% bytes) for %3%")
% body.length() % parser.get()[http::field::content_length] % url)
<< std::endl;
ec = asio::error::invalid_argument;
return or_throw<Response>(yield, ec);
}

return parser.release();
}

} // ouinet::descriptor namespace

} // ouinet namespace
24 changes: 4 additions & 20 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <cstdlib> // for atexit()

#include "cache/cache_client.h"
#include "cache/http_desc.h"

#include "namespaces.h"
#include "fetch_http_page.h"
#include "client_front_end.h"
Expand Down Expand Up @@ -278,26 +280,8 @@ Client::State::fetch_stored( const Request& request
// an error should have been reported.
assert(!content.ts.is_not_a_date_time());

http::response_parser<Response::body_type> parser;
parser.eager(true);
parser.put(asio::buffer(content.data), ec);

assert(!ec && "Malformed cache entry");

if (!parser.is_done()) {
#ifndef NDEBUG
cerr << "------- WARNING: Unfinished message in cache --------" << endl;
assert(parser.is_header_done() && "Malformed response head did not cause error");
auto response = parser.get();
cerr << request << response.base() << "<" << response.body().size() << " bytes in body>" << endl;
cerr << "-----------------------------------------------------" << endl;
#endif
ec = asio::error::not_found;
}

if (ec) return or_throw<CacheEntry>(yield, ec);

return CacheEntry{content.ts, parser.release()};
auto res = descriptor::http_parse(*_ipfs_cache, content.data, yield[ec]);
return or_throw(yield, ec, CacheEntry{content.ts, res});
}

//------------------------------------------------------------------------------
Expand Down
40 changes: 14 additions & 26 deletions src/injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <cstdlib> // for atexit()

#include "cache/cache_injector.h"
#include "cache/http_desc.h"

#include "namespaces.h"
#include "util.h"
Expand Down Expand Up @@ -175,16 +176,17 @@ struct InjectorCacheControl {
{
if (!injector) return;

stringstream ss;
ss << rs;
auto key = rq.target().to_string();

injector->insert_content(key, ss.str(),
[key] (const sys::error_code& ec, auto) {
if (ec) {
cout << "!Insert failed: " << key
<< " " << ec.message() << endl;
}
descriptor::http_create(*injector, rq, rs,
[ key = rq.target().to_string()
, injector = injector.get()] (const sys::error_code& ec, string desc_data) {
if (ec) return;
injector->insert_content(key, desc_data,
[key] (const sys::error_code& ec, auto) {
if (ec) {
cout << "!Insert failed: " << key
<< " " << ec.message() << endl;
}
});
});
}

Expand All @@ -200,24 +202,10 @@ struct InjectorCacheControl {
sys::error_code ec;

auto content = injector->get_content(rq.target().to_string(), yield[ec]);

if (ec) return or_throw<CacheEntry>(yield, ec);

http::response_parser<Response::body_type> parser;
parser.eager(true);
parser.put(asio::buffer(content.data), ec);
assert(!ec && "Malformed cache entry");

if (!parser.is_done()) {
cerr << "------- WARNING: Unfinished message in cache --------" << endl;
assert(parser.is_header_done() && "Malformed response head did not cause error");
auto rp = parser.get();
cerr << rq << rp.base() << "<" << rp.body().size() << " bytes in body>" << endl;
cerr << "-----------------------------------------------------" << endl;
ec = asio::error::not_found;
}

return or_throw(yield, ec, CacheEntry{content.ts, parser.release()});
auto res = descriptor::http_parse(*injector, content.data, yield[ec]);
return or_throw(yield, ec, CacheEntry{content.ts, res});
}

private:
Expand Down

0 comments on commit 1dc9acf

Please sign in to comment.