Skip to content
Fetching contributors…
Cannot retrieve contributors at this time
2371 lines (2099 sloc) 76.8 KB
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
ICP.cc
****************************************************************************/
#include "ts/ink_platform.h"
#include "Main.h"
#include "P_EventSystem.h"
#include "P_Cache.h"
#include "P_Net.h"
#include "MgmtUtils.h"
#include "P_RecProcess.h"
#include "ICP.h"
#include "ICPProcessor.h"
#include "ICPlog.h"
#include "logging/Log.h"
#include "logging/LogAccessICP.h"
#include "BaseManager.h"
#include "HdrUtils.h"
extern CacheLookupHttpConfig global_cache_lookup_config;
HTTPHdr gclient_request;
//****************************************************************************
// File Overview:
// ==============
// ICP files
// ICP.h -- All ICP class definitions.
// ICPlog.h -- ICP log object for logging system
// ICP.cc -- Incoming/outgoing ICP request and ICP configuration
// data base management.
// ICPConfig.cc -- ICP interface to Traffic Server configuration
// management, member functions for ICPlog (object
// passed to logging system) along with
// miscellaneous support routines.
// ICPevents.h -- Event definitions specific to ICP.
// ICPProcessor.h -- ICP external interface for other subsystems.
// External subsystems only need to include this
// header to use ICP.
// ICPProcessor.cc -- ICP external interface implementation.
// ICPStats.cc -- ICP statistic callback registration.
//
//
// Class Overview:
// ===============
// ICPConfigData -- Manages global ICP data from the TS configuration
// manager.
// PeerConfigData -- Manages ICP peer data from the TS configuration
// manager.
// ICPConfigUpdateCont -- Used by
// ICPConfiguration::icp_config_change_callback()
// to retry callout after a delay in cases where
// we cannot acquire the configuration lock.
// ICPConfiguration -- Overall manager of ICP configuration from TS
// configuration. Acts as interface and uses
// ICPConfigData and PeerConfigData to implement
// actions. Also fields/processes TS configuration
// callouts for "icp.config" changes. ICP classes only
// see ICPConfiguration when dealing with TS
// configuration info.
//
// Peer (base class) -- abstract base class
// ParentSiblingPeer : Peer -- ICP object describing parent/sibling
// peer which is initialized from the
// TS configuration data.
// MultiCastPeer : Peer -- ICP object describing MultiCast peer.
// Object is initialized from the TS
// configuration data.
//
// BitMap -- Generic bit map management class
//
// ICPProcessor -- Central class which starts all periodic events
// and maintains ICP configuration database. Delegates
// incoming data processing to ICPHandlerCont and
// outgoing data processing to ICPRequestCont. Implements
// reconfiguration actions and query requests from the
// external interface.
//
// ICPRequestCont -- Implements the state machine which processes
// locally generated ICP queries. Generates message
// queries and processes query responses. Responses
// received via callout from ICPPeerReadCont.
//
// PeriodicCont (base class) -- abstract base class
// ICPPeriodicCont : PeriodicCont -- Periodic which looks for ICP
// configuration changes sent by the Traffic Server
// configuration manager, and initiates ICP reconfiguration
// in the event we have a valid configuration change via
// ICPProcessor::ReconfigureStateMachine().
//
// ICPHandlerCont : PeriodicCont -- Periodic which monitors incoming
// ICP sockets and starts processing of the incoming ICP data.
//
// ICPPeerReadCont -- Implements the incoming data state machine.
// Processes remote ICP query requests and passes query
// responses to ICPRequestCont via a callout.
// ICPlog -- Logging object which encapsulates ICP query info required
// by the new logging subsystem to produce squid access log
// data for ICP queries.
//
//****************************************************************************
//
// ICP is integrated into HTTP miss processing as follows.
//
// if (HTTP Traffic Server Miss) {
// if (proxy.config.icp.enabled) {
// Status = QueryICP(URL, &target_ip);
// if (Status == ICP_HIT)
// Issue Http Request to (target_ip, proxy_port);
// }
// if (proxy.config.http.parent_proxy_routing_enable) {
// Issue Http Request to (proxy.config.http.parent_proxy_hostname,
// proxy.config.http.parent_proxy_port)
// }
// else
// Issue Http Request to Origin Server
// }
//
//****************************************************************************
// VC++ 5.0 is rather picky
typedef int (ICPPeerReadCont::*ICPPeerReadContHandler)(int, void *);
typedef int (ICPPeriodicCont::*ICPPeriodicContHandler)(int, void *);
typedef int (ICPHandlerCont::*ICPHandlerContHandler)(int, void *);
typedef int (ICPRequestCont::*ICPRequestContHandler)(int, void *);
// Plugin freshness function
PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc)NULL;
//---------------------------------------
// Class ICPHandlerCont member functions
// Deal with incoming ICP data
//---------------------------------------
// Static data declarations
// Allocator *ICPHandlerCont::IncomingICPDataBuf;
int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
static ClassAllocator<ICPPeerReadCont::PeerReadData> PeerReadDataAllocator("PeerReadDataAllocator");
static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
static Action *default_action = NULL;
ICPHandlerCont::ICPHandlerCont(ICPProcessor *icpP) : PeriodicCont(icpP)
{
}
// do nothing continuation handler
int
ICPHandlerCont::TossEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
return EVENT_DONE;
}
int
ICPHandlerCont::PeriodicEvent(int event, Event * /* e ATS_UNUSED */)
{
int n_peer, valid_peers;
Peer *P;
// Periodic handler which initiates incoming message processing
// on the defined peers.
valid_peers = _ICPpr->GetRecvPeers();
// get peer info from the completionEvent token.
switch (event) {
case EVENT_POLL:
case EVENT_INTERVAL: {
// start read I/Os on peers which don't have outstanding I/Os
for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
if (!P || (P && !P->IsOnline()))
continue;
if (P->shouldStartRead()) {
P->startingRead();
///////////////////////////////////////////
// Setup state machine
///////////////////////////////////////////
ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
s->init(_ICPpr, P, local_lookup);
RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
///////////////////////////////////////////
// Start processing
///////////////////////////////////////////
s->handleEvent(EVENT_INTERVAL, (Event *)0);
}
}
break;
}
default: {
ink_release_assert(!"unexpected event");
break;
}
} // End of switch
return EVENT_CONT;
}
//***************************************************************************
// Nested Class PeerReadData member functions
// Used by ICPPeerReadCont to encapsulate the data required by
// PeerReadStateMachine
//***************************************************************************
ICPPeerReadCont::PeerReadData::PeerReadData()
{
init();
}
void
ICPPeerReadCont::PeerReadData::init()
{
_start_time = 0;
_mycont = 0;
_peer = 0;
_next_state = READ_ACTIVE;
_cache_lookup_local = 0;
_buf = 0;
_rICPmsg = 0;
_rICPmsg_len = 0;
_cachelookupURL.clear();
_queryResult = 0;
_ICPReqCont = 0;
_bytesReceived = 0;
#ifdef DEBUG_ICP
_nhistory = 0;
#endif
memset((void *)&_sender, 0, sizeof(_sender));
}
ICPPeerReadCont::PeerReadData::~PeerReadData()
{
reset(1);
}
void
ICPPeerReadCont::PeerReadData::reset(int full_reset)
{
if (full_reset) {
_peer = 0;
_buf = 0;
}
if (_rICPmsg) {
_rICPmsg = 0;
_rICPmsg_len = 0;
}
if (_cachelookupURL.valid()) {
_cachelookupURL.destroy();
}
}
//***************************************************************************
//------------------------------------------------------------------------
// ICPPeerReadCont -- ICP incoming message processing state machine
//------------------------------------------------------------------------
ICPPeerReadCont::ICPPeerReadCont()
: Continuation(0),
_object_vc(NULL),
_object_read(NULL),
_cache_req_hdr_heap_handle(NULL),
_cache_resp_hdr_heap_handle(NULL),
_ICPpr(NULL),
_state(NULL),
_start_time(0),
_recursion_depth(0)
{
}
void
ICPPeerReadCont::init(ICPProcessor *ICPpr, Peer *p, int lookup_local)
{
PeerReadData *s = PeerReadDataAllocator.alloc();
s->init();
s->_start_time = Thread::get_hrtime();
s->_peer = p;
s->_next_state = READ_ACTIVE;
s->_cache_lookup_local = lookup_local;
SET_HANDLER((ICPPeerReadContHandler)&ICPPeerReadCont::ICPPeerReadEvent);
_ICPpr = ICPpr;
_state = s;
_recursion_depth = -1;
_object_vc = NULL;
_object_read = NULL;
_cache_req_hdr_heap_handle = NULL;
_cache_resp_hdr_heap_handle = NULL;
mutex = new_ProxyMutex();
}
ICPPeerReadCont::~ICPPeerReadCont()
{
reset(1); // Full reset
}
void
ICPPeerReadCont::reset(int full_reset)
{
mutex = 0;
if (this->_state) {
this->_state->reset(full_reset);
PeerReadDataAllocator.free(this->_state);
}
if (_cache_req_hdr_heap_handle) {
ats_free(_cache_req_hdr_heap_handle);
_cache_req_hdr_heap_handle = NULL;
}
if (_cache_resp_hdr_heap_handle) {
ats_free(_cache_resp_hdr_heap_handle);
_cache_resp_hdr_heap_handle = NULL;
}
}
int
ICPPeerReadCont::ICPPeerReadEvent(int event, Event *e)
{
switch (event) {
case EVENT_INTERVAL:
case EVENT_IMMEDIATE: {
break;
}
case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
case NET_EVENT_DATAGRAM_READ_COMPLETE:
case NET_EVENT_DATAGRAM_READ_ERROR:
case NET_EVENT_DATAGRAM_WRITE_ERROR: {
ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE) || (_state->_next_state == READ_DATA_DONE));
ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE) || (_state->_next_state == WRITE_DONE));
ink_release_assert(this == (ICPPeerReadCont *)completionUtil::getHandle(e));
break;
}
case CACHE_EVENT_LOOKUP_FAILED:
case CACHE_EVENT_LOOKUP: {
ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
break;
}
default: {
ink_release_assert(!"unexpected event");
}
} // End of switch
// Front end to PeerReadStateMachine(), invoked by Event subsystem.
if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
return EVENT_DONE;
} else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
_state->_peer->cancelRead();
this->reset(1); // Full reset
ICPPeerReadContAllocator.free(this);
return EVENT_DONE;
} else {
return EVENT_DONE;
}
}
int
ICPPeerReadCont::StaleCheck(int event, Event * /* e ATS_UNUSED */)
{
ip_port_text_buffer ipb;
ink_release_assert(mutex->thread_holding == this_ethread());
Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s]", event, _state->_rICPmsg->h.requestno,
_state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
switch (event) {
case ICP_STALE_OBJECT: {
_state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
break;
}
case ICP_FRESH_OBJECT: {
_state->_queryResult = CACHE_EVENT_LOOKUP;
break;
}
default: {
Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d", event);
_state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
break;
}
}
_object_vc->do_io(VIO::CLOSE);
_object_vc = 0;
SET_HANDLER((ICPPeerReadContHandler)&ICPPeerReadCont::ICPPeerReadEvent);
return handleEvent(_state->_queryResult, 0);
}
int
ICPPeerReadCont::ICPPeerQueryEvent(int event, Event *e)
{
ip_port_text_buffer ipb;
Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s]", event, _state->_rICPmsg->h.requestno,
_state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
if (pluginFreshnessCalcFunc) {
switch (event) {
case CACHE_EVENT_OPEN_READ: {
_object_vc = (CacheVConnection *)e;
SET_HANDLER((ICPPeerReadContHandler)&ICPPeerReadCont::StaleCheck);
_object_vc->get_http_info(&_object_read);
(*pluginFreshnessCalcFunc)((void *)this);
return EVENT_DONE;
}
case CACHE_EVENT_OPEN_READ_FAILED: {
event = CACHE_EVENT_LOOKUP_FAILED;
break;
}
default:
break;
}
}
// Process result
_state->_queryResult = event;
SET_HANDLER((ICPPeerReadContHandler)&ICPPeerReadCont::ICPPeerReadEvent);
return handleEvent(event, e);
}
int
ICPPeerReadCont::ICPPeerQueryCont(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
ip_port_text_buffer ipb;
Action *a;
// Perform lookup()/open_read() on behalf of PeerReadStateMachine()
((char *)_state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; // null terminate
_state->_cachelookupURL.create(NULL);
const char *qurl = (const char *)_state->_rICPmsg->un.query.URL;
_state->_cachelookupURL.parse(qurl, strlen(qurl));
Debug("icp", "Remote Query for id=%d, [%s] from [%s]", _state->_rICPmsg->h.requestno, _state->_rICPmsg->un.query.URL,
ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
SET_HANDLER((ICPPeerReadContHandler)&ICPPeerReadCont::ICPPeerQueryEvent);
if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
HttpCacheKey key;
ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
Cache::generate_key(&key, &_state->_cachelookupURL, cfg->ICPCacheGeneration());
_state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
_start_time = Thread::get_hrtime();
if (pluginFreshnessCalcFunc && cfg->ICPStaleLookup()) {
//////////////////////////////////////////////////////////////
// Note: _cache_lookup_local is ignored in this case, since
// cache clustering is not used with stale lookup.
//////////////////////////////////////////////////////////////
a = cacheProcessor.open_read(this, &key, false, &gclient_request, &global_cache_lookup_config, (time_t)0);
} else {
a = cacheProcessor.lookup(this, &key, false, _state->_cache_lookup_local);
}
if (!a) {
a = ACTION_IO_ERROR;
}
if (a == ACTION_RESULT_DONE) {
return EVENT_DONE; // callback complete
} else if (a == ACTION_IO_ERROR) {
handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
return EVENT_DONE; // callback complete
} else {
return EVENT_CONT; // callback pending
}
} else {
// Null URL, return failed lookup
handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
return EVENT_DONE; // callback done
}
}
struct AutoReference {
AutoReference(int *cnt)
{
_cnt = cnt;
(*_cnt)++;
}
~AutoReference() { (*_cnt)--; }
int *_cnt;
};
int
ICPPeerReadCont::PeerReadStateMachine(PeerReadData *s, Event *e)
{
AutoReference l(&_recursion_depth);
ip_port_text_buffer ipb; // scratch buffer for diagnostic messages.
//-----------------------------------------------------------
// State machine to process ICP data received on UDP socket
//-----------------------------------------------------------
MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
if (!lock.is_locked()) {
// we didn't get the lock, so we don't need to unlock it
// coverity[missing_unlock]
return EVENT_CONT; // try again later
}
while (1) { // loop forever
switch (s->_next_state) {
case READ_ACTIVE: {
ink_release_assert(_recursion_depth == 0);
if (!_ICPpr->Lock())
return EVENT_CONT; // unable to get lock, try again later
bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer.get());
if (valid_peer && _ICPpr->AllowICPQueries() && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
// Note pending incoming ICP request or response
_ICPpr->IncPendingQuery();
_ICPpr->Unlock();
s->_next_state = READ_DATA;
RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
break; // move to next_state
} else {
_ICPpr->Unlock();
// ICP NOT enabled, do nothing
s->_next_state = READ_PROCESSING_COMPLETE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
return EVENT_DONE;
}
}
ink_release_assert(0); // Should never happen
case READ_DATA: {
ink_release_assert(_recursion_depth == 0);
// Assumption of one outstanding read per peer...
// Setup read from FD
ink_assert(!s->_peer->buf);
Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
buf->fill(sizeof(ICPMsg_t)); // reserve space for decoding
char *be = buf->buf_end() - 1;
be[0] = 0; // null terminate buffer
s->_next_state = READ_DATA_DONE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
ink_assert(s->_peer->readAction == NULL);
Action *a =
s->_peer->RecvFrom_re(this, this, buf.get(), buf->write_avail() - 1, &s->_peer->fromaddr.sa, &s->_peer->fromaddrlen);
if (!a) {
a = ACTION_IO_ERROR;
}
if (a == ACTION_RESULT_DONE) {
// we will have been called back already and our state updated
// appropriately.
// move to next state
ink_assert(s->_next_state == PROCESS_READ_DATA);
break;
} else if (a == ACTION_IO_ERROR) {
// actually, this *could* be taken care of by the main handler, but
// error processing makes more sense at this point. Therefore,
// the main handler ignores the errors.
//
// No data, terminate read loop.
//
ICP_INCREMENT_DYN_STAT(no_data_read_stat);
s->_peer->buf = NULL; // release reference
s->_next_state = READ_NOT_ACTIVE_EXIT;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
// move to next state
break;
} else {
s->_peer->readAction = a;
return EVENT_DONE;
}
}
ink_release_assert(0); // Should never happen
case READ_DATA_DONE: {
// Convert ICP message from network to host format
if (s->_peer->readAction != NULL) {
ink_assert(s->_peer->readAction == e);
s->_peer->readAction = NULL;
}
s->_bytesReceived = completionUtil::getBytesTransferred(e);
if (s->_bytesReceived >= 0) {
s->_next_state = PROCESS_READ_DATA;
RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
} else {
ICP_INCREMENT_DYN_STAT(no_data_read_stat);
s->_peer->buf = NULL; // release reference
s->_next_state = READ_NOT_ACTIVE_EXIT;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
}
if (_recursion_depth > 0) {
return EVENT_DONE;
} else {
break;
}
}
ink_release_assert(0); // Should never happen
case PROCESS_READ_DATA:
case ADD_PEER: {
ink_release_assert(_recursion_depth == 0);
Ptr<IOBufferBlock> bufblock = s->_peer->buf;
char *buf = bufblock->start();
if (s->_next_state == PROCESS_READ_DATA) {
ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)(buf + sizeof(ICPMsg_t)), (ICPMsg_t *)buf);
// adjust buffer pointers to point to decoded message.
bufblock->reset();
bufblock->fill(s->_bytesReceived);
// Validate message length for sanity
if (s->_bytesReceived < ((ICPMsg_t *)buf)->h.msglen) {
//
// Short read, terminate
//
ICP_INCREMENT_DYN_STAT(short_read_stat);
s->_peer->buf = NULL;
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
break; // move to next_state
}
}
// Validate receiver and convert the received sockaddr
// to internal sockaddr format.
IpEndpoint from;
if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr.sa, &from.sa)) {
int status;
ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
ICPMsg_t *ICPmsg = (ICPMsg_t *)buf;
if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) && cfg->ICPReplyToUnknownPeer() &&
((ICPmsg->h.version == ICP_VERSION_2) || (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
//
// Add the unknown Peer to our database to
// allow us to resolve the lookup request.
//
if (!_ICPpr->GetConfig()->Lock()) {
s->_next_state = ADD_PEER;
RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
return EVENT_CONT;
}
if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
_ICPpr->GetConfig()->Unlock();
goto invalid_message;
}
int icp_reply_port = cfg->ICPDefaultReplyPort();
if (!icp_reply_port) {
icp_reply_port = ntohs(ats_ip_port_cast(&s->_peer->fromaddr));
}
PeerConfigData *Pcfg = new PeerConfigData(PeerConfigData::CTYPE_SIBLING, IpAddr(s->_peer->fromaddr), 0, icp_reply_port);
ParentSiblingPeer *P = new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true);
status = _ICPpr->AddPeer(P);
ink_release_assert(status);
status = _ICPpr->AddPeerToSendList(P);
ink_release_assert(status);
P->GetChan()->setRemote(P->GetIP());
// coverity[uninit_use_in_call]
Note("ICP Peer added ip=%s", ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
from = s->_peer->fromaddr;
} else {
invalid_message:
//
// Sender does not exist in ICP configuration, terminate
//
ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
Debug("icp", "Received msg from invalid sender [%s]", ats_ip_nptop(&s->_peer->fromaddr, ipb, sizeof(ipb)));
s->_peer->buf = NULL;
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
break; // move to next_state
}
}
// we hand off the decoded buffer from the Peer to the PeerReadData
s->_sender = from;
s->_rICPmsg_len = s->_bytesReceived;
ink_assert(!s->_buf);
s->_buf = s->_peer->buf;
s->_rICPmsg = (ICPMsg_t *)s->_buf->start();
s->_peer->buf = NULL;
//
// Handle only ICP_VERSION_2/3 messages. Reject all others.
//
if ((s->_rICPmsg->h.version != ICP_VERSION_2) && (s->_rICPmsg->h.version != ICP_VERSION_3)) {
ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s]", (uint32_t)s->_rICPmsg->h.version,
ats_ip_nptop(&from, ipb, sizeof(ipb)));
s->_rICPmsg = NULL;
s->_buf = NULL;
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
break; // move to next_state
}
//
// If this is a query message, redirect to
// the query specific handlers.
//
if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
ink_assert(!s->_mycont);
s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
if (ICPPeerQueryCont(0, (Event *)0) == EVENT_DONE) {
break; // Callback complete
} else {
return EVENT_DONE; // Callback pending
}
} else {
// We have a response message for an ICP query.
Debug("icp", "Response for Id=%d, from [%s]", s->_rICPmsg->h.requestno, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
s->_next_state = GET_ICP_REQUEST;
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
break; // move to next_state
}
}
ink_release_assert(0); // Should never happen
case AWAITING_CACHE_LOOKUP_RESPONSE: {
int status = 0;
void *data = s->_rICPmsg->un.query.URL;
int datalen = strlen((const char *)data) + 1;
if (s->_queryResult == CACHE_EVENT_LOOKUP) {
// Use the received ICP data buffer for the response message
Debug("icp", "Sending ICP_OP_HIT for id=%d, [%.*s] to [%s]", s->_rICPmsg->h.requestno, datalen, (const char *)data,
ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT, s->_rICPmsg->h.requestno, 0 /* optflags */, 0 /* optdata */,
0 /* shostid */, data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
} else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
// Use the received ICP data buffer for response message
Debug("icp", "Sending ICP_OP_MISS for id=%d, [%.*s] to [%s]", s->_rICPmsg->h.requestno, datalen, (const char *)data,
ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS, s->_rICPmsg->h.requestno, 0 /* optflags */, 0 /* optdata */,
0 /* shostid */, data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
} else {
Warning("Bad cache lookup event: %d", s->_queryResult);
ink_release_assert(!"Invalid cache lookup event");
}
ink_assert(status == 0);
// Make system log entry for ICP query
ICPlog logentry(s);
LogAccessICP accessor(&logentry);
Log::access(&accessor);
s->_next_state = SEND_REPLY;
RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
if (_recursion_depth > 0) {
return EVENT_DONE;
} else {
break;
}
}
ink_release_assert(0); // Should never happen
case SEND_REPLY: {
ink_release_assert(_recursion_depth == 0);
//
// Send the query response back to the sender
//
s->_next_state = WRITE_DONE;
RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
ink_assert(s->_peer->writeAction == NULL);
Action *a = s->_peer->SendMsg_re(this, this, &s->_mhdr, &s->_sender.sa);
if (!a) {
a = ACTION_IO_ERROR;
}
if (a == ACTION_RESULT_DONE) {
// we have been called back already and our state updated
// appropriately
break;
} else if (a == ACTION_IO_ERROR) {
// Partial write.
ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
// coverity[uninit_use_in_call]
Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s", ntohs(s->_rICPmsg->h.msglen), -1,
ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
break;
} else {
s->_peer->writeAction = a;
return EVENT_DONE;
}
}
ink_release_assert(0); // Should never happen
case WRITE_DONE: {
s->_peer->writeAction = NULL;
int len = completionUtil::getBytesTransferred(e);
if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
ICP_INCREMENT_DYN_STAT(query_response_write_stat);
s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender.sa); // log query reply
} else {
// Partial write.
ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
// coverity[uninit_use_in_call]
Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s", ntohs(s->_rICPmsg->h.msglen), len,
ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
}
// Processing complete, perform completion actions
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
Debug("icp", "state->READ_NOT_ACTIVE");
if (_recursion_depth > 0) {
return EVENT_DONE;
} else {
break; // move to next_state
}
}
ink_release_assert(0); // Should never happen
case GET_ICP_REQUEST: {
ink_release_assert(_recursion_depth == 0);
ink_assert(s->_rICPmsg && s->_rICPmsg_len); // Sanity check
// Get ICP request associated with response message
s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
if (s->_ICPReqCont) {
s->_next_state = GET_ICP_REQUEST_MUTEX;
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
break; // move to next_state
}
//
// No ICP request for response message, log as "response
// for non-existent ICP request" and terminate processing
//
Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
Peer *p = _ICPpr->FindPeer(s->_sender);
p->LogRecvMsg(s->_rICPmsg, 0);
s->_next_state = READ_NOT_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
break; // move to next_state
}
ink_release_assert(0); // Should never happen
case GET_ICP_REQUEST_MUTEX: {
ink_release_assert(_recursion_depth == 0);
ink_assert(s->_ICPReqCont);
Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
EThread *ethread = this_ethread();
ink_hrtime request_start_time;
if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
//
// Unable to get ICP request mutex, delay and move back
// to the GET_ICP_REQUEST state. We need to do this
// since the ICP request may be deallocated by the active
// continuation.
//
s->_ICPReqCont = (ICPRequestCont *)0;
s->_next_state = GET_ICP_REQUEST;
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
return EVENT_CONT;
}
// Log as "response for ICP request"
Peer *p = _ICPpr->FindPeer(s->_sender);
p->LogRecvMsg(s->_rICPmsg, 1);
// Process the ICP response for the given ICP request
ICPRequestCont::ICPRequestEventArgs_t args;
args.rICPmsg = s->_rICPmsg;
args.rICPmsg_len = s->_rICPmsg_len;
args.peer = p;
if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
request_start_time = s->_ICPReqCont->GetRequestStartTime();
Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
s->_ICPReqCont->handleEvent((int)ICP_RESPONSE_MESSAGE, (void *)&args);
} else {
request_start_time = 0;
delete s->_ICPReqCont;
Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
}
// Note: s->_ICPReqCont is deallocated at this point.
s->_ICPReqCont = 0;
MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
if (request_start_time) {
ICP_SUM_DYN_STAT(total_icp_response_time_stat, (Thread::get_hrtime() - request_start_time));
}
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
s->_next_state = READ_NOT_ACTIVE;
break; // move to next_state
}
ink_release_assert(0); // Should never happen
case READ_NOT_ACTIVE:
case READ_NOT_ACTIVE_EXIT: {
ink_release_assert(_recursion_depth == 0);
if (!_ICPpr->Lock())
return EVENT_CONT; // unable to get lock, try again later
// Note incoming ICP request or response completion
_ICPpr->DecPendingQuery();
_ICPpr->Unlock();
s->_buf = 0;
if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
s->_next_state = READ_PROCESSING_COMPLETE;
return EVENT_DONE;
} else {
// Last read was valid, see if any more read data before exiting
s->reset();
s->_start_time = Thread::get_hrtime();
s->_next_state = READ_ACTIVE;
RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
break; // restart
}
}
ink_release_assert(0); // Should never happen
case READ_PROCESSING_COMPLETE:
default:
ink_release_assert(0); // Should never happen
} // End of switch
} // End of while(1)
}
//------------------------------------------------------------------------
// Class ICPRequestCont member functions
// Implements the state machine which processes locally generated
// ICP queries.
//------------------------------------------------------------------------
ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
ICPRequestCont::ICPRequestCont(ICPProcessor *pr, Continuation *c, URL *u)
: Continuation(0),
_cont(c),
_url(u),
_start_time(0),
_ICPpr(pr),
_timeout(0),
npending_actions(0),
pendingActions(NULL),
_sequence_number(0),
_expected_replies(0),
_expected_replies_list(MAX_DEFINED_PEERS),
_received_replies(0),
_next_state(ICP_START)
{
memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
_ret_status = ICP_LOOKUP_FAILED;
_act.cancelled = false;
_act = c;
memset((void *)&_ICPmsg, 0, sizeof(_ICPmsg));
memset((void *)&_sendMsgHdr, 0, sizeof(_sendMsgHdr));
memset((void *)&_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
if (c)
this->mutex = c->mutex;
}
ICPRequestCont::~ICPRequestCont()
{
_act = NULL;
this->mutex = NULL;
if (_timeout) {
_timeout->cancel(this);
_timeout = 0;
}
RemoveICPRequest(_sequence_number);
if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
if (_ICPmsg.un.query.URL) {
ats_free(_ICPmsg.un.query.URL);
}
}
if (pendingActions) {
delete pendingActions;
pendingActions = 0;
}
}
void
ICPRequestCont::remove_from_pendingActions(Action *a)
{
if (!pendingActions) {
npending_actions--;
return;
}
for (intptr_t i = 0; i < pendingActions->length(); i++) {
if ((*pendingActions)[i] == a) {
for (intptr_t j = i; j < pendingActions->length() - 1; j++)
(*pendingActions)[j] = (*pendingActions)[j + 1];
pendingActions->set_length(pendingActions->length() - 1);
npending_actions--;
return;
}
}
npending_actions--; // completed inline
}
void
ICPRequestCont::remove_all_pendingActions()
{
int active_pendingActions = 0;
if (!pendingActions) {
return;
}
for (intptr_t i = 0; i < pendingActions->length(); i++) {
if ((*pendingActions)[i] && ((*pendingActions)[i] != ACTION_IO_ERROR)) {
((*pendingActions)[i])->cancel();
(*pendingActions)[i] = 0;
npending_actions--;
active_pendingActions++;
} else {
(*pendingActions)[i] = 0;
}
}
pendingActions->set_length(pendingActions->length() - active_pendingActions);
}
int
ICPRequestCont::ICPRequestEvent(int event, Event *e)
{
// Note: Passed parameter 'e' is not an Event *
// if event == ICP_RESPONSE_MESSAGE
ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE || event == NET_EVENT_DATAGRAM_WRITE_ERROR || event == EVENT_IMMEDIATE ||
event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
// handle reentrant callback
if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE) || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
ink_assert(npending_actions > 0);
remove_from_pendingActions((Action *)e);
return EVENT_DONE;
}
// Start of user ICP query request processing. We start here after
// the reschedule in ICPProcessor::ICPQuery().
switch (_next_state) {
case ICP_START:
case ICP_OFF_TERMINATE:
case ICP_QUEUE_REQUEST:
case ICP_AWAITING_RESPONSE:
case ICP_DEQUEUE_REQUEST:
case ICP_POST_COMPLETION:
case ICP_REQUEST_NOT_ACTIVE: {
if (ICPStateMachine(event, (void *)e) == EVENT_CONT) {
//
// Unable to acquire lock, reschedule continuation
//
eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
return EVENT_CONT;
} else if (_next_state == ICP_DONE) {
//
// ICP request processing complete.
//
delete this;
break;
} else {
break;
}
}
ink_release_assert(0); // should never happen
case ICP_DONE:
default:
ink_release_assert(0); // should never happen
} // End of switch
return EVENT_DONE;
}
int
ICPRequestCont::NopICPRequestEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
delete this;
return EVENT_DONE;
}
int
ICPRequestCont::ICPStateMachine(int event, void *d)
{
//*******************************************
// ICP message processing state machine
//*******************************************
ICPConfiguration *ICPcf = _ICPpr->GetConfig();
ip_port_text_buffer ipb;
while (1) { // loop forever
switch (_next_state) {
case ICP_START: {
// User may have cancelled request, if so abort request.
if (_act.cancelled) {
_next_state = ICP_DONE;
return EVENT_DONE;
}
if (!_ICPpr->Lock())
return EVENT_CONT; // Unable to get lock, try again later
if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
// Reject NULL pointer or "localhost" URLs
if (_url->valid()) {
int host_len;
const char *host = _url->host_get(&host_len);
if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
_ICPpr->Unlock();
// NULL pointer or "localhost" URL, terminate request
_next_state = ICP_OFF_TERMINATE;
Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
break; // move to next_state
}
}
// Note pending ICP request
_ICPpr->IncPendingQuery();
_ICPpr->Unlock();
// Build the ICP query message
char *urlstr = _url->string_get(NULL);
int urlstr_len = strlen(urlstr) + 1;
int status = BuildICPMsg(ICP_OP_QUERY, _sequence_number = ICPReqSeqNumber(), 0 /* optflags */, 0 /* optdata */,
0 /* shostid */, (void *)urlstr, urlstr_len, &_sendMsgHdr, _sendMsgIOV, &_ICPmsg);
// urlstr memory freed in destructor
ink_assert(status == 0);
Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
_next_state = ICP_QUEUE_REQUEST;
break; // move to next_state
} else {
ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
_ICPpr->Unlock();
// ICP NOT enabled, terminate request
_next_state = ICP_OFF_TERMINATE;
break; // move to next_state
}
}
ink_release_assert(0); // should never happen
case ICP_OFF_TERMINATE: {
if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
return EVENT_CONT; // unable to get lock, delay and retry
}
Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
// ICP NOT enabled, post completion on request
if (!_act.cancelled) {
_cont->handleEvent(_ret_status, (void *)&_ret_sockaddr);
}
MUTEX_UNTAKE_LOCK(mutex, this_ethread());
_next_state = ICP_DONE;
return EVENT_DONE;
}
ink_release_assert(0); // should never happen
case ICP_QUEUE_REQUEST: {
// Place ICP request on the pending request queue
int ret = AddICPRequest(_sequence_number, this);
ink_assert(ret == 0);
// Generate ICP requests to peers
int bias = _ICPpr->GetStartingSendPeerBias();
int SendPeers = _ICPpr->GetSendPeers();
npending_actions = 0;
while (SendPeers > 0) {
Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
if (!P->IsOnline()) {
SendPeers--;
continue;
}
//
// Send query request to Peers
//
// because of reentrancy, we have to do this first, just
// in case we get called back immediately.
int was_expected = P->ExpectedReplies(&_expected_replies_list);
_expected_replies += was_expected;
npending_actions++;
Action *a = P->SendMsg_re(this, P, &_sendMsgHdr, NULL);
if (!a) {
a = ACTION_IO_ERROR;
}
if (a != ACTION_IO_ERROR) {
if (a != ACTION_RESULT_DONE) {
if (!pendingActions) {
pendingActions = new DynArray<Action *>(&default_action);
}
(*pendingActions)(npending_actions) = a;
}
P->LogSendMsg(&_ICPmsg, NULL); // log as send query
Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s]", _sequence_number,
ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
} else {
_expected_replies_list.ClearBit(P->GetPeerID());
_expected_replies -= was_expected;
// Partial or failed write.
ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
// coverity[uninit_use_in_call]
Debug("icp_warn", "ICP query send, res=%d, ip=%s", ntohs(_ICPmsg.h.msglen), ats_ip_ntop(P->GetIP(), ipb, sizeof(ipb)));
}
SendPeers--;
}
Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
if (!_expected_replies) {
//
// Nothing to wait for, terminate ICP processing
//
ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
_next_state = ICP_DEQUEUE_REQUEST;
break; // move to next_state
}
ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
//
// Setup ICP request response timeout
//
int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
_timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
_next_state = ICP_AWAITING_RESPONSE;
return EVENT_DONE;
}
ink_release_assert(0); // should never happen
case ICP_AWAITING_RESPONSE: {
Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
ink_assert(d);
ICPRequestEventArgs_t dummyArgs;
ICPRequestEventArgs_t *args = 0;
if (event == ICP_RESPONSE_MESSAGE) {
args = (ICPRequestEventArgs_t *)d;
} else if (event == EVENT_INTERVAL) {
memset((void *)&dummyArgs, 0, sizeof(dummyArgs));
args = &dummyArgs;
} else {
ink_release_assert(0); // should never happen
}
// Process ICP response
if (ICPResponseMessage(event, args->rICPmsg, args->peer) == EVENT_DONE) {
// ICP Request processing is complete, do completion actions
_next_state = ICP_DEQUEUE_REQUEST;
break; // move to next_state
} else {
// Continue to wait for additional replies
return EVENT_DONE;
}
}
ink_release_assert(0); // should never happen
case ICP_DEQUEUE_REQUEST: {
// Remove ICP request from active queue
int ret = RemoveICPRequest(_sequence_number);
Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
ink_assert(ret == 0);
//_sequence_number = 0; // moved to REQUEST_NOT_ACTIVE
_next_state = ICP_POST_COMPLETION;
break; // move to next_state
}
ink_release_assert(0); // should never happen
case ICP_POST_COMPLETION: {
if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
return EVENT_CONT; // unable to get lock, delay and retry
}
Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
// Post completion on the ICP request.
if (!_act.cancelled) {
_cont->handleEvent(_ret_status, (void *)&_ret_sockaddr);
}
MUTEX_UNTAKE_LOCK(mutex, this_ethread());
ICP_SUM_DYN_STAT(total_icp_request_time_stat, (Thread::get_hrtime() - _start_time));
_next_state = ICP_WAIT_SEND_COMPLETE;
break; // move to next_state
}
ink_release_assert(0); // should never happen
case ICP_WAIT_SEND_COMPLETE: {
// wait for all the sends to complete.
if (npending_actions > 0) {
Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
} else {
_next_state = ICP_REQUEST_NOT_ACTIVE;
// move to next state
break;
}
} break;
ink_release_assert(0); // should never happen
case ICP_REQUEST_NOT_ACTIVE: {
Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
_sequence_number = 0;
if (!_ICPpr->Lock())
return EVENT_CONT; // Unable to get lock, try again later
// Note pending ICP request completion
_ICPpr->DecPendingQuery();
_ICPpr->Unlock();
_next_state = ICP_DONE;
return EVENT_DONE;
}
ink_release_assert(0); // should never happen
case ICP_DONE:
default:
ink_release_assert(0); // should never happen
} // End of switch
} // End of while(1)
}
int
ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t *m, Peer *peer)
{
ip_port_text_buffer ipb, ipb2;
if (event == EVENT_INTERVAL) {
_timeout = 0;
remove_all_pendingActions();
// ICP request response timeout, if we received a response from
// any parent, return it to resolve the miss.
if (_received_replies) {
int NumParentPeers = _ICPpr->GetParentPeers();
if (NumParentPeers > 0) {
int n;
Peer *pp;
for (n = 0; n < NumParentPeers; n++) {
pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID()) && pp->isUp()) {
ats_ip_copy(&_ret_sockaddr.sa, pp->GetIP());
_ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer *>(pp)->GetProxyPort());
_ret_status = ICP_LOOKUP_FOUND;
Debug("icp", "ICP timeout using parent Id=%d from [%s] return [%s]", _sequence_number,
ats_ip_nptop(pp->GetIP(), ipb, sizeof(ipb)), ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2)));
return EVENT_DONE;
}
}
}
}
// Timeout received on ICP request, return ICP_LOOKUP_FAILED
Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
return EVENT_DONE;
} else {
// We have received a response to our ICP query request.
// See if this response resolves the ICP query.
//
ink_assert(m->h.requestno == _sequence_number);
switch (m->h.opcode) {
case ICP_OP_HIT:
case ICP_OP_HIT_OBJ: {
// Kill timeout event
_timeout->cancel(this);
_timeout = 0;
ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
++_received_replies;
ats_ip_copy(&_ret_sockaddr, peer->GetIP());
_ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer *>(peer)->GetProxyPort());
_ret_status = ICP_LOOKUP_FOUND;
Debug("icp", "ICP Response HIT for Id=%d from [%s] return [%s]", _sequence_number,
ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)), ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2)));
return EVENT_DONE;
}
case ICP_OP_MISS:
case ICP_OP_ERR:
case ICP_OP_MISS_NOFETCH:
case ICP_OP_DENIED: {
Debug("icp", "ICP MISS response for Id=%d from [%s]", _sequence_number, ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)));
// "received_replies" is only for Peers who we expect a reply
// from (Peers which are in the expected_replies_list).
int Id = peer->GetPeerID();
if (_expected_replies_list.IsBitSet(Id)) {
// Clear bit to note receipt of reply
_expected_replies_list.ClearBit(Id);
++_received_replies;
}
if (_received_replies < _expected_replies)
return EVENT_CONT; // wait for more responses
// Kill timeout event
_timeout->cancel(this);
_timeout = 0;
ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
//
// All responders have returned ICP_OP_MISS.
// If parents exists, select one to resolve the request.
//
if (_ICPpr->GetParentPeers() > 0) {
// In cases where multiple parents exist, we use
// a round robin scheme.
Peer *p = NULL;
// try to find an UP parent, if none, return ICP_LOOKUP_FAILED
{
int i;
for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
// find an UP parent
if (p->isUp())
break;
}
// if no parent is selected, then return ICP_LOOKUP_FAILED
if (i >= _ICPpr->GetParentPeers()) {
Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
p = NULL;
}
}
if (p) {
ats_ip_copy(&_ret_sockaddr, p->GetIP());
_ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer *>(p)->GetProxyPort());
_ret_status = ICP_LOOKUP_FOUND;
Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s]", _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
return EVENT_DONE;
}
}
Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s]", _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
return EVENT_DONE;
}
default: {
ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
// coverity[uninit_use_in_call]
Warning("Invalid ICP response, op=%d reqno=%d ip=%s", m->h.opcode, m->h.requestno,
ats_ip_ntop(peer->GetIP(), ipb, sizeof(ipb)));
return EVENT_CONT; // wait for more responses
}
} // End of switch
}
}
//------------------------------------------------
// Class ICPRequestCont static member functions
//------------------------------------------------
// Static member function
void
ICPRequestCont::NetToHostICPMsg(ICPMsg_t *in, ICPMsg_t *out)
{
out->h.opcode = in->h.opcode;
out->h.version = in->h.version;
out->h.msglen = ntohs(in->h.msglen);
out->h.requestno = ntohl(in->h.requestno);
out->h.optionflags = ntohl(in->h.optionflags);
out->h.optiondata = ntohl(in->h.optiondata);
out->h.shostid = ntohl(in->h.shostid);
switch (in->h.opcode) {
case ICP_OP_QUERY: {
memcpy((char *)&out->un.query.rhostid, (char *)((char *)(&in->h.shostid) + sizeof(in->h.shostid)),
sizeof(out->un.query.rhostid));
out->un.query.rhostid = ntohl(out->un.query.rhostid);
out->un.query.URL = (char *)((char *)(&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
break;
}
case ICP_OP_HIT: {
out->un.hit.URL = (char *)((char *)(&in->h.shostid) + sizeof(in->h.shostid));
break;
}
case ICP_OP_MISS: {
out->un.miss.URL = (char *)((char *)(&in->h.shostid) + sizeof(in->h.shostid));
break;
}
case ICP_OP_HIT_OBJ: {
out->un.hitobj.URL = (char *)((char *)(&in->h.shostid) + sizeof(in->h.shostid));
// strlen() is bounded since buffer in null terminated.
out->un.hitobj.p_objsize = (char *)(out->un.hitobj.URL + strlen(out->un.hitobj.URL));
memcpy((char *)&out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
out->un.hitobj.data = (char *)(out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
break;
}
default:
break;
}
}
int
ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno, int optflags, int optdata, int shostid, void *data, int datalen,
struct msghdr *mhdr, struct iovec *iov, ICPMsg_t *icpmsg)
{
// Build ICP message for transmission in network byte order.
if (op == ICP_OP_QUERY) {
icpmsg->un.query.rhostid = htonl(0);
icpmsg->un.query.URL = (char *)data;
mhdr->msg_iov = iov;
mhdr->msg_iovlen = 3;
iov[0].iov_base = (caddr_t)icpmsg;
iov[0].iov_len = sizeof(ICPMsgHdr_t);
iov[1].iov_base = (caddr_t)&icpmsg->un.query.rhostid;
iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
iov[2].iov_base = (caddr_t)data;
iov[2].iov_len = datalen;
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
} else if (op == ICP_OP_HIT) {
icpmsg->un.hit.URL = (char *)data;
mhdr->msg_iov = iov;
mhdr->msg_iovlen = 2;
iov[0].iov_base = (caddr_t)icpmsg;
iov[0].iov_len = sizeof(ICPMsgHdr_t);
iov[1].iov_base = (caddr_t)data;
iov[1].iov_len = datalen;
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
} else if (op == ICP_OP_MISS) {
icpmsg->un.miss.URL = (char *)data;
mhdr->msg_iov = iov;
mhdr->msg_iovlen = 2;
iov[0].iov_base = (caddr_t)icpmsg;
iov[0].iov_len = sizeof(ICPMsgHdr_t);
iov[1].iov_base = (caddr_t)data;
iov[1].iov_len = datalen;
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
} else {
ink_release_assert(0);
return 1; // failed
}
mhdr->msg_name = (caddr_t)0;
mhdr->msg_namelen = 0;
// TODO: The following is just awkward
#if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris) && !defined(openbsd)
mhdr->msg_accrights = (caddr_t)0;
mhdr->msg_accrightslen = 0;
#elif !defined(solaris)
mhdr->msg_control = 0;
mhdr->msg_controllen = 0;
mhdr->msg_flags = 0;
#endif
icpmsg->h.opcode = op;
icpmsg->h.version = ICP_VERSION_2;
icpmsg->h.requestno = htonl(seqno);
icpmsg->h.optionflags = htonl(optflags);
icpmsg->h.optiondata = htonl(optdata);
icpmsg->h.shostid = htonl(shostid);
return 0; // Success
}
// Static ICPRequestCont data declarations
unsigned int ICPRequestCont::ICPRequestSeqno = 1;
Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
// Static member function
unsigned int
ICPRequestCont::ICPReqSeqNumber()
{
// Generate ICP request sequence numbers. This must be unique.
unsigned int res = 0;
do {
res = (unsigned int)ink_atomic_increment((int *)&ICPRequestSeqno, 1);
} while (!res);
return res;
}
// Static member function
inline int
ICPRequestCont::ICPRequestHash(unsigned int seqno)
{
// ICPRequestQueue hash
return seqno % ICP_REQUEST_HASH_SIZE;
}
// Static member function
int
ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont *r)
{
// Add ICP request to ICP outstanding queue (ICPRequestQueue).
// return: 0 - success
ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
return 0; // Success
}
// Static member function
ICPRequestCont *
ICPRequestCont::FindICPRequest(unsigned int seqno)
{
// Find ICP request on outstanding queue with the given sequence number
int hash = ICPRequestHash(seqno);
ICPRequestCont *r;
for (r = (ICPRequestCont *)ICPRequestQueue[hash].head; r; r = (ICPRequestCont *)r->link.next) {
if (r->_sequence_number == seqno)
return r;
}
return (ICPRequestCont *)0; // Not found
}
// Static member function
int
ICPRequestCont::RemoveICPRequest(unsigned int seqno)
{
// Remove ICP request from outstanding queue with the given
// sequence number
// Return: 0 - success; 1 - not found
if (!seqno) {
return 1; // Not found
}
int hash = ICPRequestHash(seqno);
ICPRequestCont *r;
for (r = (ICPRequestCont *)ICPRequestQueue[hash].head; r; r = (ICPRequestCont *)r->link.next) {
if (r->_sequence_number == seqno) {
ICPRequestQueue[hash].remove(r);
return 0;
}
}
return 1; // Not found
}
//------------------------------------------------------------------------
// Class ICPProcessor member functions
// Central class which initializes the ICP world.
// Delegates incoming message processing to ICPHandlerCont
// and outgoing message processing to ICPRequestCont.
// Manages the ICP configuration database derived from TS
// configuration info.
//------------------------------------------------------------------------
// Static data declarations for ICPProcessor
void
initialize_thread_for_icp(EThread *e)
{
(void)e;
}
ICPProcessor icpProcessorInternal;
ICPProcessorExt icpProcessor(&icpProcessorInternal);
ICPProcessor::ICPProcessor()
: _l(0),
_Initialized(0),
_AllowIcpQueries(0),
_PendingIcpQueries(0),
_ICPConfig(0),
_ICPPeriodic(0),
_ICPHandler(0),
_mcastCB_handler(NULL),
_PeriodicEvent(0),
_ICPHandlerEvent(0),
_nPeerList(-1),
_LocalPeer(0),
_curSendPeer(0),
_nSendPeerList(-1),
_curRecvPeer(0),
_nRecvPeerList(-1),
_curParentPeer(0),
_nParentPeerList(-1),
_ValidPollData(0),
_last_recv_peer_bias(0)
{
memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
}
ICPProcessor::~ICPProcessor()
{
if (_ICPPeriodic) {
MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
_PeriodicEvent->cancel();
Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
}
if (_ICPHandler) {
MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
_ICPHandlerEvent->cancel();
Mutex_unlock(_ICPHandler->mutex, this_ethread());
}
}
void
ICPProcessor::start()
{
//*****************************************************
// Perform initialization actions for ICPProcessor
// (called at system startup)
//*****************************************************
if (_Initialized) // Do only once
return;
//
// Setup ICPProcessor lock, required since ICPProcessor is instantiated
// as static object.
//
_l = new AtomicLock();
//
// Setup custom allocators
//
// replaced with generic IOBufferBlock allocator
ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
//
// Setup ICP stats callbacks
//
InitICPStatCallbacks();
//
// Create ICP configuration objects
//
_ICPConfig = new ICPConfiguration();
_mcastCB_handler = new ICPHandlerCont(this);
SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler)&ICPHandlerCont::TossEvent);
//
// Build ICP peer list and setup listen sockets
//
if (_ICPConfig->globalConfig()->ICPconfigured()) {
if (BuildPeerList() == 0) {
if (SetupListenSockets() == 0) {
_AllowIcpQueries = 1; // allow receipt of queries
}
}
}
DumpICPConfig();
//
// Start ICP configuration monitor (periodic continuation)
//
_ICPPeriodic = new ICPPeriodicCont(this);
SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler)&ICPPeriodicCont::PeriodicEvent);
_PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
//
// Start ICP receive handler continuation
//
_ICPHandler = new ICPHandlerCont(this);
SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler)&ICPHandlerCont::PeriodicEvent);
_ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler, HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
//
// Stale lookup data initializations
//
if (!gclient_request.valid()) {
gclient_request.create(HTTP_TYPE_REQUEST);
}
_Initialized = 1;
}
Action *
ICPProcessor::ICPQuery(Continuation *c, URL *url)
{
//**************************************
// HTTP state machine interface to ICP
//**************************************
// Build continuation to process ICP request
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex.get();
ICPRequestCont *rc = new (ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
rc->SetRequestStartTime();
SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler)&ICPRequestCont::ICPRequestEvent);
eventProcessor.schedule_imm(rc, ET_ICP);
return rc->GetActionPtr();
}
int
ICPProcessor::BuildPeerList()
{
// Returns 0 on Success
//
//---------------------------------------------------------------------
// We always place all allocated Peer elements onto PeerList[],
// which is used to track allocated elements and validate (ip, port)
// uniqueness in the ICP configuration.
//
// All MultiCastPeer(s) link the underlying ParentSiblingPeer structures
// using a singly linked list off the MultiCastPeer.
//
// Peer elements placed onto SendPeerList[] are elements which are
// the target of ICP queries.
// In the case where MultiCasting is used, a pseudo peer element
// (MultiCastPeer) is placed onto the SendPeerList[] to act as a place
// holder for the underlying Peers.
//
// RecvPeerList[] is the list of Peer(s) we perform reads on for
// ICP messages. In the case of MultiCast, the pseudo MultiCast peer
// element (MultiCastPeer) is placed on this list. Since we currently
// funnel all unicast receives through the local peer UDP socket,
// only the local peer and any pseudo MultiCastPeer structures reside
// on this list.
//
// Parent (PEER_PARENT) Peer elements are also added to ParentPeerList
// which is used to select a parent in the case where all ICP queries
// have returned ICP_MISS.
//---------------------------------------------------------------------
//
PeerConfigData *Pcfg;
Peer *P;
Peer *mcP;
int index;
int status;
PeerType_t type;
//
// From the working copy of the ICP configuration data, build the
// internal Peer data structures for ICP processing.
// First, establish the Local Peer descriptor before processing
// parents and siblings.
//
Pcfg = _ICPConfig->indexToPeerConfigData(0);
ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
// Get IP address for given interface
IpEndpoint tmp_ip;
if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
Pcfg->_ip_addr._family = AF_UNSPEC;
// No IP address for given interface
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
} else {
Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
}
Pcfg->_proxy_port = 0;
Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
Pcfg->_mc_member = 0;
Pcfg->_mc_ip_addr._family = AF_UNSPEC;
Pcfg->_mc_ttl = 0;
//***************************************************
// Descriptor for local host, add to PeerList and
// RecvPeerList
//***************************************************
P = new ParentSiblingPeer(PEER_LOCAL, Pcfg, this);
status = AddPeer(P);
ink_release_assert(status);
status = AddPeerToRecvList(P);
ink_release_assert(status);
_LocalPeer = P;
for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
Pcfg = _ICPConfig->indexToPeerConfigData(index);
type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
//
// Ignore parent and sibling entries corresponding to "localhost".
// This is possible in a cluster configuration where parents and
// siblings are cluster members. Note that in a cluster
// configuration, "icp.config" is shared by all nodes.
//
if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
continue; // ignore
if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
if (Pcfg->MultiCastMember()) {
mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
if (!mcP) {
//*********************************
// Create multicast peer structure
//*********************************
mcP = new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this);
status = AddPeer(mcP);
ink_assert(status);
status = AddPeerToSendList(mcP);
ink_assert(status);
status = AddPeerToRecvList(mcP);
ink_assert(status);
}
//*****************************
// Add child to MultiCast peer
//*****************************
P = new ParentSiblingPeer(type, Pcfg, this);
status = AddPeer(P);
ink_assert(status);
status = ((MultiCastPeer *)mcP)->AddMultiCastChild(P);
ink_assert(status);
} else {
//*****************************
// Add parent/sibling peer
//*****************************
P = new ParentSiblingPeer(type, Pcfg, this);
status = AddPeer(P);
ink_assert(status);
status = AddPeerToSendList(P);
ink_assert(status);
}
//****************************************
// Also, add parent peers to parent list.
//****************************************
if (type == PEER_PARENT) {
status = AddPeerToParentList(P);
ink_assert(status);
}
}
}
return 0; // Success
}
void
ICPProcessor::FreePeerList()
{
// Deallocate all Peer structures
int index;
for (index = 0; index < (_nPeerList + 1); ++index) {
if (_PeerList[index]) {
_PeerList[index] = 0;
}
}
// Reset all control data
_nPeerList = -1;
_LocalPeer = (Peer *)0;
_curSendPeer = 0;
_nSendPeerList = -1;
_curRecvPeer = 0;
_nRecvPeerList = -1;
_curParentPeer = 0;
_nParentPeerList = -1;
_ValidPollData = 0;
_last_recv_peer_bias = 0;
for (index = 0; index < PEER_LIST_SIZE; index++) {
_PeerList[index] = 0;
}
for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
_SendPeerList[index] = 0;
}
for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
_RecvPeerList[index] = 0;
}
for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
_ParentPeerList[index] = 0;
}
memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
}
int
ICPProcessor::SetupListenSockets()
{
int allow_null_configuration;
if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
_ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
allow_null_configuration = 1;
} else {
allow_null_configuration = 0;
}
// Returns 0 on Success.
//
// Perform some basic sanity checks on the ICP configuration.
//
if (!_LocalPeer) {
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
return 1; // Failed
}
if (GetSendPeers() == 0) {
if (!allow_null_configuration) {
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
return 1; // Failed
}
}
if (GetRecvPeers() == 0) {
if (!allow_null_configuration) {
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
return 1; // Failed
}
}
//
// Establish the required sockets for elements on the PeerList[].
//
Peer *P;
int status;
int index;
ip_port_text_buffer ipb, ipb2;
for (index = 0; index < (_nPeerList + 1); ++index) {
if ((P = _PeerList[index].get())) {
if ((P->GetType() == PEER_PARENT) || (P->GetType() == PEER_SIBLING)) {
ParentSiblingPeer *pPS = (ParentSiblingPeer *)P;
pPS->GetChan()->setRemote(pPS->GetIP());
} else if (P->GetType() == PEER_MULTICAST) {
MultiCastPeer *pMC = (MultiCastPeer *)P;
ink_assert(_mcastCB_handler != NULL);
status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(),
DISABLE_MC_LOOPBACK, _mcastCB_handler);
if (status) {
// coverity[uninit_use_in_call]
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed, res=%d, ip=%s bind_ip=%s", status,
ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)), ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2)));
return 1; // Failed
}
status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetSendChan(),
_mcastCB_handler);
if (status) {
// coverity[uninit_use_in_call]
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed, res=%d, ip=%s", status,
ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
return 1; // Failed
}
}
}
}
//
// Setup the socket for the local host.
// We funnel all unicast sends and receives through
// the local peer UDP socket.
//
ParentSiblingPeer *pPS = (ParentSiblingPeer *)(GetLocalPeer());
NetVCOptions options;
options.local_ip.assign(pPS->GetIP());
options.local_port = pPS->GetICPPort();
options.ip_proto = NetVCOptions::USE_UDP;
options.addr_binding = NetVCOptions::INTF_ADDR;
status = pPS->GetChan()->open(options);
if (status) {
// coverity[uninit_use_in_call] ?
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP bind_connect failed, res=%d, ip=%s", status,
ats_ip_nptop(pPS->GetIP(), ipb, sizeof(ipb)));
return 1; // Failed
}
return 0; // Success
}
void
ICPProcessor::ShutdownListenSockets()
{
//
// Close all open sockets for elements on the PeerList[]
//
ink_assert(!PendingQuery());
Peer *P;
for (int index = 0; index < (_nPeerList + 1); ++index) {
if ((P = IdToPeer(index))) {
if (P->GetType() == PEER_LOCAL) {
ParentSiblingPeer *pPS = (ParentSiblingPeer *)P;
(void)pPS->GetChan()->close();
} else if (P->GetType() == PEER_MULTICAST) {
MultiCastPeer *pMC = (MultiCastPeer *)P;
(void)pMC->GetSendChan()->close();
(void)pMC->GetRecvChan()->close();
}
}
}
}
int
ICPProcessor::Reconfigure(int /* global_config_changed ATS_UNUSED */, int /* peer_config_changed ATS_UNUSED */)
{
// Returns 0 on Success
//
// At this point, ICP requests processing is disabled and
// no pending ICP requests exist.
//
ink_assert(_ICPConfig->HaveLock());
ink_assert(!AllowICPQueries());
ink_assert(!PendingQuery());
//
// Shutdown and deallocate all structures associated with the
// current configuration.
//
ShutdownListenSockets();
FreePeerList();
//
// Copy the new configuration into the working copy and
// rebuild all associated structures.
//
_ICPConfig->UpdateGlobalConfig();
_ICPConfig->UpdatePeerConfig();
int status = -1;
if (_ICPConfig->globalConfig()->ICPconfigured()) {
if ((status = BuildPeerList()) == 0) {
status = SetupListenSockets();
}
DumpICPConfig();
}
return status;
}
ICPProcessor::ReconfigState_t
ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
{
//*****************************************************************
// State machine which performs the ICP reconfiguration actions.
// Defined states are as follows:
// 1) (RC_RECONFIG) disable ICP, reconfigure if no request pending,
// else delay and retry. Reconfigure and if success move to
// RC_ENABLE_ICP else RC_DONE.
// 2) (RC_ENABLE_ICP) enable ICP, free ICP configuration lock.
// 3) (RC_DONE) free ICP configuration lock.
//*****************************************************************
ink_assert(_ICPConfig->HaveLock());
int reconfig_status;
while (1) {
switch (s) {
case RC_RECONFIG: {
if (!Lock())
return RC_RECONFIG; // Unable to get lock, try again
if (PendingQuery()) {
DisableICPQueries(); // disable ICP processing
Unlock();
CancelPendingReads();
return RC_RECONFIG; // Pending requests, delay and retry
} else {
DisableICPQueries(); // disable ICP processing
Unlock();
// No pending ICP queries, perform reconfiguration
reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
if (reconfig_status == 0) {
s = RC_ENABLE_ICP; // reconfig OK, enable ICP
} else {
s = RC_DONE; // reconfig failed, do not enable ICP
}
break; // move to next state
}
}
case RC_ENABLE_ICP: {
if (!Lock())
return RC_ENABLE_ICP; // Unable to get lock, try again
EnableICPQueries(); // Enable ICP processing
Unlock();
s = RC_DONE;
break; // move to next state
}
case RC_DONE: {
// Release configuration lock
_ICPConfig->Unlock();
return RC_DONE; // Reconfiguration complete
}
default: {
ink_release_assert(0); // Should never happen
}
} // End of switch
} // End of while
return RC_DONE;
}
void
ICPProcessor::CancelPendingReads()
{
// Cancel pending ICP read by sending a bogus message to
// the local ICP port.
ICPRequestCont *r = new (ICPRequestCont_allocator.alloc()) ICPRequestCont(this, NULL, NULL);
SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler)&ICPRequestCont::NopICPRequestEvent);
r->mutex = new_ProxyMutex();
// TODO: Check return value?
ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 /* optflags */, 0 /* optdata */, 0 /* shostid */, (void *)0, 0, &r->_sendMsgHdr,
r->_sendMsgIOV, &r->_ICPmsg);
r->_sendMsgHdr.msg_iovlen = 1;
r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; // bogus message
Peer *lp = GetLocalPeer();
IpEndpoint local_endpoint;
ats_ip_copy(&local_endpoint.sa, lp->GetIP());
r->_sendMsgHdr.msg_name = (caddr_t)&local_endpoint;
r->_sendMsgHdr.msg_namelen = sizeof(local_endpoint);
udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
}
Peer *
ICPProcessor::GenericFindListPeer(IpAddr const &ip, uint16_t port, int validListItems, Ptr<Peer> *List)
{
Peer *P;
port = htons(port);
for (int n = 0; n < validListItems; ++n) {
if ((P = List[n].get())) {
if ((P->GetIP() == ip) && ((port == 0) || (ats_ip_port_cast(P->GetIP()) == port)))
return P;
}
}
return NULL;
}
Peer *
ICPProcessor::FindPeer(IpAddr const &ip, uint16_t port)
{
// Find (Peer *) with the given (ip,port) on the global list (PeerList)
return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
}
Peer *
ICPProcessor::FindSendListPeer(IpAddr const &ip, uint16_t port)
{
// Find (Peer *) with the given (ip,port) on the
// scheduler list (SendPeerList)
return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
}
Peer *
ICPProcessor::FindRecvListPeer(IpAddr const &ip, uint16_t port)
{
// Find (Peer *) with the given (ip,port) on the
// receive list (RecvPeerList)
return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
}
int
ICPProcessor::AddPeer(Peer *P)
{
// Add (Peer *) to the global list (PeerList). Make sure (ip,port) is
// unique.
// Returns 1 - added; 0 - Not added
//
// Make sure no duplicate exists
//
if (FindPeer(P->GetIP())) {
ip_port_text_buffer x;
// coverity[uninit_use_in_call]
RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions for ip=%s",
ats_ip_nptop(P->GetIP(), x, sizeof(x)));
return 0; // Not added
} else {
// Valid entry
if (_nPeerList + 1 < PEER_LIST_SIZE) {
_nPeerList++;
_PeerList[_nPeerList] = P;
P->SetPeerID(_nPeerList);
return 1; // Added
} else {
return 0; // Not added
}
}
}
int
ICPProcessor::AddPeerToRecvList(Peer *P)
{
// Add (Peer *) to the listen list (RecvPeerList).
// Make sure (ip,port) is unique.
// Returns 1 - added; 0 - Not added
// Assert that no duplicate exists
ink_assert(FindRecvListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
_nRecvPeerList++;
_RecvPeerList[_nRecvPeerList] = P;
return 1; // Added
} else {
return 0; // Not added
}
}
int
ICPProcessor::AddPeerToSendList(Peer *P)
{
// Add (Peer *) to the scheduler list (SendPeerList).
// Make sure (ip,port) is unique.
// Returns 1 - added; 0 - Not added
// Assert that no duplicate exists
ink_assert(FindSendListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
_nSendPeerList++;
_SendPeerList[_nSendPeerList] = P;
return 1; // Added
} else {
return 0; // Not added
}
}
int
ICPProcessor::AddPeerToParentList(Peer *P)
{
// Add (Peer *) to the parent list (ParentPeerList).
// Returns 1 - added; 0 - Not added
if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
_nParentPeerList++;
_ParentPeerList[_nParentPeerList] = P;
return 1; // Added
} else {
return 0; // Not added
}
}
// End of ICP.cc
Something went wrong with that request. Please try again.