Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/atscppapi/src/AsyncHttpFetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void AsyncHttpFetch::run(shared_ptr<AsyncDispatchControllerBase> sender) {
snprintf(size_buf, sizeof(size_buf), "%zu", state_->request_body_.size());
state_->request_.getHeaders().set("Content-Length", size_buf);
}
request_str += headers.str();
request_str += headers.wireStr();
request_str += "\r\n";
request_str += state_->request_body_;

Expand Down
13 changes: 13 additions & 0 deletions lib/atscppapi/src/Headers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,25 @@ std::string Headers::str() {
return oss.str();
}

std::string Headers::wireStr() {
string retval;
for (iterator iter = begin(), last = end(); iter != last; ++iter) {
HeaderField hf = *iter;
retval += hf.name().str();
retval += ": ";
retval += hf.values(", ");
retval += "\r\n";
}
return retval;
}

std::ostream& operator<<(std::ostream &os, atscppapi::Headers &obj) {
for(header_field_iterator it = obj.begin(); it != obj.end(); ++it) {
HeaderField hf = *it;
os << hf << std::endl;
}
return os;
}

} /* atscppapi namespace */

51 changes: 43 additions & 8 deletions lib/atscppapi/src/InterceptPlugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
using namespace atscppapi;
using std::string;

namespace {

/** This contains info for the continuation callback to invoke the plugin */
struct PluginHandle {
InterceptPlugin *plugin_;
shared_ptr<Mutex> mutex_;
PluginHandle(InterceptPlugin *plugin, shared_ptr<Mutex> mutex) : plugin_(plugin), mutex_(mutex) { }
};

}

/**
* @private
*/
Expand Down Expand Up @@ -72,9 +83,12 @@ struct InterceptPlugin::State {
TSMBuffer hdr_buf_;
TSMLoc hdr_loc_;
int num_bytes_written_;
PluginHandle *plugin_handle_;
bool shut_down_;

State(TSCont cont) : cont_(cont), net_vc_(NULL), expected_body_size_(0), num_body_bytes_read_(0),
hdr_parsed_(false), hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0) {
hdr_parsed_(false), hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0),
plugin_handle_(NULL), shut_down_(false) {
http_parser_ = TSHttpParserCreate();
}

Expand All @@ -98,8 +112,9 @@ int handleEvents(TSCont cont, TSEvent event, void *edata);
InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type type)
: TransactionPlugin(transaction) {
TSCont cont = TSContCreate(handleEvents, TSMutexCreate());
TSContDataSet(cont, this);
state_ = new State(cont);
state_->plugin_handle_ = new PluginHandle(this, TransactionPlugin::getMutex());
TSContDataSet(cont, state_->plugin_handle_);
TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle());
if (type == SERVER_INTERCEPT) {
TSHttpTxnServerIntercept(cont, txn);
Expand Down Expand Up @@ -130,7 +145,16 @@ InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type
}

InterceptPlugin::~InterceptPlugin() {
TSContDestroy(state_->cont_);
if (!state_->shut_down_) {
// transaction is closing, but intercept hasn't finished. Indicate
// that plugin is dead (cleanup will be done by continuation
// callback)
state_->plugin_handle_->plugin_ = NULL;
}
else {
TSContDestroy(state_->cont_);
delete state_->plugin_handle_;
}
delete state_;
}

Expand Down Expand Up @@ -198,15 +222,13 @@ bool InterceptPlugin::doRead() {
// remaining data in this block is body; 'data' will be pointing to first byte of the body
num_body_bytes_in_block = endptr - data;
}
ScopedSharedMutexLock scopedLock(getMutex());
consume(string(startptr, data - startptr), InterceptPlugin::REQUEST_HEADER);
}
else {
num_body_bytes_in_block = data_len;
}
if (num_body_bytes_in_block) {
state_->num_body_bytes_read_ += num_body_bytes_in_block;
ScopedSharedMutexLock scopedLock(getMutex());
consume(string(data, num_body_bytes_in_block), InterceptPlugin::REQUEST_BODY);
}
consumed += data_len;
Expand All @@ -225,7 +247,6 @@ bool InterceptPlugin::doRead() {
LOG_ERROR("Read more data than specified in request");
// TODO: any further action required?
}
ScopedSharedMutexLock scopedLock(getMutex());
handleInputComplete();
}
else {
Expand Down Expand Up @@ -255,6 +276,10 @@ void InterceptPlugin::handleEvent(int abstract_event, void *edata) {
TSHttpHdrTypeSet(state_->hdr_buf_, state_->hdr_loc_, TS_HTTP_TYPE_REQUEST);
break;

case TS_EVENT_VCONN_WRITE_READY: // nothing to do
LOG_DEBUG("Got write ready");
break;

case TS_EVENT_VCONN_READ_READY:
LOG_DEBUG("Handling read ready");
if (doRead()) {
Expand All @@ -277,6 +302,7 @@ void InterceptPlugin::handleEvent(int abstract_event, void *edata) {
if (state_->net_vc_) {
TSVConnClose(state_->net_vc_);
}
state_->shut_down_ = true;
break;

default:
Expand All @@ -287,8 +313,17 @@ void InterceptPlugin::handleEvent(int abstract_event, void *edata) {
namespace {

int handleEvents(TSCont cont, TSEvent event, void *edata) {
InterceptPlugin *plugin = static_cast<InterceptPlugin *>(TSContDataGet(cont));
utils::internal::dispatchInterceptEvent(plugin, event, edata);
PluginHandle *plugin_handle = static_cast<PluginHandle *>(TSContDataGet(cont));
ScopedSharedMutexLock scopedSharedMutexLock(plugin_handle->mutex_);
if (plugin_handle->plugin_) {
utils::internal::dispatchInterceptEvent(plugin_handle->plugin_, event, edata);
}
else {
// plugin is dead; cleanup
LOG_ERROR("Received event %d after plugin died!", event);
TSContDestroy(cont);
delete plugin_handle;
}
return 0;
}

Expand Down
8 changes: 7 additions & 1 deletion lib/atscppapi/src/include/atscppapi/Headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,17 @@ class Headers: noncopyable {
HeaderField operator[](const std::string &key);

/**
* Get a string representing all the header fields.
* Get a human-readable/log-friendly string representing all the header fields.
* @return a string representation of all the header fields
*/
std::string str();

/**
* Get a string that can be put on the wire
* @return a string representation of all the header fields
*/
std::string wireStr();

friend std::ostream& operator<<(std::ostream &os, Headers &obj);

~Headers();
Expand Down
1 change: 1 addition & 0 deletions lib/atscppapi/src/include/utils_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class internal {
}

static void dispatchInterceptEvent(InterceptPlugin *plugin, TSEvent event, void *edata) {
ScopedSharedMutexLock scopedSharedMutexLock(plugin->getMutex());
plugin->handleEvent(static_cast<int>(event), edata);
}

Expand Down