Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

8089 lines (7264 sloc) 291.612 kB
// -*- mode: C; c-basic-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
// vim: set softtabstop=4 shiftwidth=4 tabstop=4 expandtab:
/*************************************************************************
* Copyright 2009-2013 Eucalyptus Systems, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*
* Please contact Eucalyptus Systems, Inc., 6755 Hollister Ave., Goleta
* CA 93117, USA or visit http://www.eucalyptus.com/licenses/ if you need
* additional information or have any questions.
*
* This file may incorporate work covered under the following copyright
* and permission notice:
*
* Software License Agreement (BSD License)
*
* Copyright (c) 2008, Regents of the University of California
* All rights reserved.
*
* Redistribution and use of this software in source and binary forms,
* with or without modification, are permitted provided that the
* following conditions are met:
*
* Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. USERS OF THIS SOFTWARE ACKNOWLEDGE
* THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE LICENSED MATERIAL,
* COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS SOFTWARE,
* AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
* IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA,
* SANTA BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY,
* WHICH IN THE REGENTS' DISCRETION MAY INCLUDE, WITHOUT LIMITATION,
* REPLACEMENT OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO
* IDENTIFIED, OR WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT
* NEEDED TO COMPLY WITH ANY SUCH LICENSES OR RIGHTS.
************************************************************************/
//!
//! @file cluster/handlers.c
//! Need to provide description
//!
/*----------------------------------------------------------------------------*\
| |
| INCLUDES |
| |
\*----------------------------------------------------------------------------*/
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <semaphore.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <math.h>
#include <assert.h>
#include <json/json.h>
#include <eucalyptus.h>
#include "axis2_skel_EucalyptusCC.h"
#include <misc.h>
#include <data.h>
#include <ipc.h>
#include <objectstorage.h>
#include <http.h>
#include <euca_gni.h>
#include <storage-windows.h>
#include <fault.h>
#include <euca_string.h>
#include <euca_network.h>
#include <euca_auth.h>
#include <euca_axis.h>
#include <axutil_error.h>
#include <ebs_utils.h>
#include <dev_handler.h>
#include "server-marshal.h"
#include "handlers.h"
#include "client-marshal.h"
#include "config-cc.h"
#include "handlers-state.h"
#include <stats.h>
#include <message_stats.h>
#include <message_sensor.h>
#include <service_sensor.h>
/*----------------------------------------------------------------------------*\
| |
| DEFINES |
| |
\*----------------------------------------------------------------------------*/
#define SUPERUSER "eucalyptus"
#define MAX_SENSOR_RESOURCES MAXINSTANCES_PER_CC
#define POLL_INTERVAL_SAFETY_MARGIN_SEC 3
#define POLL_INTERVAL_MINIMUM_SEC 6
#define STATS_INTERVAL_SEC 60
/*----------------------------------------------------------------------------*\
| |
| TYPEDEFS |
| |
\*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*\
| |
| ENUMERATIONS |
| |
\*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*\
| |
| STRUCTURES |
| |
\*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*\
| |
| EXTERNAL VARIABLES |
| |
\*----------------------------------------------------------------------------*/
/* Should preferably be handled in header file */
/*----------------------------------------------------------------------------*\
| |
| GLOBAL VARIABLES |
| |
\*----------------------------------------------------------------------------*/
//! @{
// @name local globals
int config_init = 0;
int local_init = 0;
int thread_init = 0;
int sensor_initd = 0;
int init = 0;
int stats_initd = 0;
//! @}
//! @{
//! @name shared (between CC processes) globals
ccConfig *config = NULL;
ccInstanceCache *instanceCache = NULL; // canonical source for latest information about instances
euca_network *gpEucaNet = NULL;
globalNetworkInfo *globalnetworkinfo = NULL;
ccResourceCache *resourceCache = NULL; // canonical source for latest information about resources
ccResourceCache *resourceCacheStage = NULL; // clone of resourceCache used for aggregating replies from NCs (via child procs)
sensorResourceCache *ccSensorResourceCache = NULL; // canonical source for latest sensor data, both local and from NCs
char *message_stats_shared_mem = NULL; //Reference to the shared memory region
char message_stats_cache[MESSAGE_STATS_MEMORY_REGION_SIZE]; //The proc local holder for cached copies of message_stats_shared_mem to avoid realloc for each cache copy.
json_object *stats_cache_json = NULL; //! Pointer to parsed stats from the cache
//! @}
//! @{
//! @name shared (between CC processes) semaphores
sem_t *locks[ENDLOCK];
int mylocks[ENDLOCK];
//! @}
#ifndef NO_COMP
const char *euca_this_component_name = "cc";
const char *euca_client_component_name = "clc";
#endif /* ! NO_COMP */
char *SCHEDPOLICIES[SCHEDLAST] = {
"GREEDY",
"ROUNDROBIN",
"POWERSAVE",
"USER",
};
/*----------------------------------------------------------------------------*\
| |
| STATIC VARIABLES |
| |
\*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*\
| |
| STATIC PROTOTYPES |
| |
\*----------------------------------------------------------------------------*/
static void reconfigure_resourceCache(ccResource * res, int numHosts);
static void refresh_resourceCache(ccResourceCache * updatedResourceCache, boolean do_purge_unconfigured);
static int schedule_instance_migration(ncInstance * instance, char **includeNodes, char **excludeNodes, int includeNodeCount, int excludeNodeCount, int inresid, int *outresid,
ccResourceCache * resourceCacheLocal, char **replyString);
static int migration_handler(ccInstance * myInstance, char *host, char *src, char *dst, migration_states migration_state, char **node, char **instance, char **action);
static int populateOutboundMeta(ncMetadata * pMeta);
static int initialize_stats_system(int interval_sec);
static json_object **message_stats_getter();
static void message_stats_setter();
static char *stats_service_check_call();
static char *stats_service_state_call();
static void lock_stats();
static void unlock_stats();
/*----------------------------------------------------------------------------*\
| |
| MACROS |
| |
\*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*\
| |
| IMPLEMENTATION |
| |
\*----------------------------------------------------------------------------*/
//!
//!
//!
//! @note this routine runs immediately when the process is started
//!
void doInitCC(void)
{
if (initialize(NULL, FALSE)) {
LOGWARN("could not initialize\n");
}
LOGINFO("component started\n");
}
//! Runs a check on service and returns result in string form
//! for the stats sensor
static char *stats_service_check_call()
{
LOGTRACE("Invoking CC check function for internal stats\n");
int clcTimer = FALSE;
if (ccCheckState(clcTimer) != EUCA_OK) {
return SERVICE_CHECK_FAILED_MSG;
} else {
return SERVICE_CHECK_OK_MSG;
}
}
//! Gets the CC state as a string for use by the stats system
static char *stats_service_state_call()
{
LOGTRACE("Getting CC service state for internal stats\n");
char state[32];
char *return_state;
if (ccGetStateString(state, 32) != EUCA_OK) {
return "UNKNOWN";
} else {
return_state = euca_strdup(state);
return return_state;
}
}
//! Simple wrapper used by stats system to abstract type of lock needed
static void lock_stats()
{
sem_mywait(STATSCACHE);
}
//! Simple wrapper used by stats system to abstract type of lock needed
static void unlock_stats()
{
sem_mypost(STATSCACHE);
}
//! Provides CC-specific initializations for the stats system of
//! internal service sensors (state sensors, message statistics, etc)
//! @returns EUCA_OK on success, or error code on failure
static int initialize_stats_system(int interval_sec)
{
LOGDEBUG("Initializing stats subsystem for CC\n");
int ret = EUCA_OK;
int stats_ttl = interval_sec + 1;
lock_stats();
{
//Zero the cache location
bzero(message_stats_cache, MESSAGE_STATS_MEMORY_REGION_SIZE);
//Init the message sensor with component-specific data
ret = initialize_message_sensor(euca_this_component_name, interval_sec, stats_ttl, message_stats_getter, message_stats_setter);
if (ret != EUCA_OK) {
LOGERROR("Error initializing internal message sensor: %d\n", ret);
goto cleanup;
} else {
json_object **tmp = message_stats_getter();
const char *tmp_out = json_object_to_json_string(*tmp);
LOGINFO("Initialized internal message stats: %s\n", tmp_out);
}
//Init the service state sensor with component-specific data
ret = initialize_service_state_sensor(euca_this_component_name, interval_sec, stats_ttl, stats_service_state_call, stats_service_check_call);
if (ret != EUCA_OK) {
LOGERROR("Error initializing internal service state sensor: %d\n", ret);
goto cleanup;
}
ret = init_stats(config->eucahome, euca_this_component_name, lock_stats, unlock_stats);
if (ret != EUCA_OK) {
LOGERROR("Could not initialize CC stats system: %d\n", ret);
goto cleanup;
}
}
if (!ret) {
LOGINFO("Stats subsystem initialized\n");
} else {
LOGERROR("Stat subsystem init failed: %d\n", ret);
}
cleanup:
unlock_stats();
return ret;
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] instanceId
//! @param[in] bucketName
//! @param[in] filePrefix
//! @param[in] objectStorageURL
//! @param[in] userPublicKey
//! @param[in] S3Policy
//! @param[in] S3PolicySig
//! @param[in] architecture
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doBundleInstance(ncMetadata * pMeta, char *instanceId, char *bucketName, char *filePrefix, char *objectStorageURL, char *userPublicKey, char *S3Policy, char *S3PolicySig,
char *architecture)
{
int i, j, rc, start = 0, stop = 0, ret = 0, timeout, done;
char internalObjectStorageURL[EUCA_MAX_PATH], theObjectStorageURL[EUCA_MAX_PATH];
ccInstance *myInstance;
time_t op_start;
ccResourceCache resourceCacheLocal;
i = j = 0;
myInstance = NULL;
op_start = time(NULL);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("[%s] bundling requested\n", instanceId);
LOGDEBUG("invoked: userId=%s, instanceId=%s, bucketName=%s, filePrefix=%s, objectStorageURL=%s, userPublicKey=%s, S3Policy=%s, S3PolicySig=%s, architecture=%s\n",
SP(pMeta ? pMeta->userId : "UNSET"), SP(instanceId), SP(bucketName), SP(filePrefix), SP(objectStorageURL), SP(userPublicKey), SP(S3Policy), SP(S3PolicySig),
SP(architecture));
if (!instanceId) {
LOGERROR("bad input params\n");
return (1);
}
// get internal object storage IP
done = 0;
internalObjectStorageURL[0] = '\0';
for (i = 0; i < 16 && !done; i++) {
if (!strcmp(config->services[i].type, "objectstorage")) {
snprintf(internalObjectStorageURL, EUCA_MAX_PATH, "%s", config->services[i].uris[0]);
done++;
}
}
if (done) {
snprintf(theObjectStorageURL, EUCA_MAX_PATH, "%s", internalObjectStorageURL);
} else {
strncpy(theObjectStorageURL, objectStorageURL, strlen(objectStorageURL) + 1);
}
sem_mywait(RESCACHE);
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
rc = find_instanceCacheId(instanceId, &myInstance);
if (!rc) {
// found the instance in the cache
if (myInstance) {
start = myInstance->ncHostIdx;
stop = start + 1;
EUCA_FREE(myInstance);
}
} else {
start = 0;
stop = resourceCacheLocal.numResources;
}
done = 0;
for (j = start; j < stop && !done; j++) {
timeout = ncGetTimeout(op_start, OP_TIMEOUT, stop - start, j);
rc = ncClientCall(pMeta, timeout, resourceCacheLocal.resources[j].lockidx, resourceCacheLocal.resources[j].ncURL, "ncBundleInstance",
instanceId, bucketName, filePrefix, theObjectStorageURL, userPublicKey, S3Policy, S3PolicySig, architecture);
if (rc) {
ret = 1;
} else {
ret = 0;
done++;
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] instanceId
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doBundleRestartInstance(ncMetadata * pMeta, char *instanceId)
{
int j = 0;
int rc = 0;
int start = 0;
int stop = 0;
int ret = 0;
int timeout = 0;
int done = 0;
ccInstance *myInstance = NULL;
time_t op_start = time(NULL);
ccResourceCache resourceCacheLocal;
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled())
return (1);
LOGINFO("[%s] bundling instance restart\n", SP(instanceId));
LOGDEBUG("invoked: instanceId=%s userId=%s\n", SP(instanceId), SP(pMeta ? pMeta->userId : "UNSET"));
if (instanceId == NULL) {
LOGERROR("bad input params\n");
return (1);
}
sem_mywait(RESCACHE);
{
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
}
sem_mypost(RESCACHE);
if ((rc = find_instanceCacheId(instanceId, &myInstance)) == 0) {
// found the instance in the cache
if (myInstance) {
start = myInstance->ncHostIdx;
stop = start + 1;
EUCA_FREE(myInstance);
myInstance = NULL;
}
} else {
start = 0;
stop = resourceCacheLocal.numResources;
}
done = 0;
for (j = start; ((j < stop) && !done); j++) {
timeout = ncGetTimeout(op_start, OP_TIMEOUT, (stop - start), j);
rc = ncClientCall(pMeta, timeout, resourceCacheLocal.resources[j].lockidx, resourceCacheLocal.resources[j].ncURL, "ncBundleRestartInstance", instanceId);
if (rc) {
ret = 1;
} else {
ret = 0;
done++;
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] instanceId
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doCancelBundleTask(ncMetadata * pMeta, char *instanceId)
{
int i, rc, start = 0, stop = 0, ret = 0, done, timeout;
ccInstance *myInstance;
time_t op_start;
ccResourceCache resourceCacheLocal;
i = 0;
myInstance = NULL;
op_start = time(NULL);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("[%s] bundle task cancelled\n", SP(instanceId));
LOGDEBUG("invoked: instanceId=%s userId=%s\n", SP(instanceId), SP(pMeta ? pMeta->userId : "UNSET"));
if (!instanceId) {
LOGERROR("bad input params\n");
return (1);
}
sem_mywait(RESCACHE);
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
rc = find_instanceCacheId(instanceId, &myInstance);
if (!rc) {
// found the instance in the cache
if (myInstance) {
start = myInstance->ncHostIdx;
stop = start + 1;
EUCA_FREE(myInstance);
}
} else {
start = 0;
stop = resourceCacheLocal.numResources;
}
done = 0;
for (i = start; i < stop && !done; i++) {
timeout = ncGetTimeout(op_start, OP_TIMEOUT, stop - start, i);
rc = ncClientCall(pMeta, timeout, resourceCacheLocal.resources[i].lockidx, resourceCacheLocal.resources[i].ncURL, "ncCancelBundleTask", instanceId);
if (rc) {
ret = 1;
} else {
ret = 0;
done++;
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//! Remove cluster and storage services from other partitions so NCs only get globals and cluster-local services
//! Modifies the meta in-place
void filter_services(ncMetadata * meta, char *filter_partition)
{
int i = 0, j = 0;
serviceInfoType tmp;
int copySize = sizeof(serviceInfoType);
for (i = 0; i < meta->servicesLen; i++) {
//Only filter cluster controllers and storage controllers.
if ((!strcmp(meta->services[i].type, "cluster") || !strcmp(meta->services[i].type, "storage")) && strcmp(meta->services[i].partition, filter_partition)) {
//Not equal, remove by making string len 0.
LOGTRACE("Filtering out service: %s , %s\n", meta->services[i].name, meta->services[i].partition);
//Null the strings.
meta->services[i].name[0] = '\0';
meta->services[i].partition[0] = '\0';
meta->services[i].type[0] = '\0';
for (j = 0; j < meta->services[i].urisLen; j++) {
meta->services[i].uris[j][0] = '\0';
}
meta->services[i].urisLen = 0;
//Swap this one and the one at the end and decrement the length.
memcpy(&tmp, &(meta->services[i]), copySize);
memcpy(&(meta->services[i]), &(meta->services[meta->servicesLen - 1]), copySize);
memcpy(&(meta->services[meta->servicesLen - 1]), &(tmp), copySize);
meta->servicesLen--;
}
}
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] timeout
//! @param[in] ncLock
//! @param[in] ncURL
//! @param[in] ncOp
//! @param[in] ...
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int ncClientCall(ncMetadata * pMeta, int timeout, int ncLock, char *ncURL, char *ncOp, ...)
{
#define WRITE_REPLY_STRING \
{ \
if (timeout) { \
int __len = 0; \
if (localmeta->replyString) { \
__len = strlen(localmeta->replyString); \
} \
int __bytes = write(filedes[1], &__len, sizeof(int)); \
if (__len > 0) { \
__bytes += write(filedes[1], localmeta->replyString, (sizeof(char) * __len)); \
} \
LOGTRACE("child process wrote %d bytes (len=%d)\n", __bytes, __len); \
} \
}
#define READ_REPLY_STRING \
{ \
if (timeout) { \
int __len = 0; \
rbytes = timeread(filedes[0], &__len, sizeof(int), timeout); \
LOGTRACE("parent process read %d bytes (len=%d)\n", rbytes, __len); \
if (rbytes <= 0) { \
killwait(pid); \
opFail = 1; \
} else if (__len > 0) { \
pMeta->replyString = EUCA_ALLOC(__len, sizeof(char)); \
if (pMeta->replyString == NULL) { \
LOGFATAL("out of memory! ncOps=%s\n", ncOp); \
unlock_exit(1); \
} \
rbytes = timeread(filedes[0], pMeta->replyString, __len, timeout); \
if (rbytes <= 0) { \
killwait(pid); \
opFail = 1; \
} \
} \
} \
}
int i = 0;
int pid = 0;
int rc = 0;
int ret = 0;
int status = 0;
int opFail = 0;
int len = 0;
int rbytes = 0;
int filedes[2] = { 0 };
va_list al = { {0} };
LOGTRACE("invoked: ncOps=%s ncURL=%s timeout=%d\n", ncOp, ncURL, timeout); // these are common
if ((rc = pipe(filedes)) != 0) {
LOGERROR("cannot create pipe ncOps=%s\n", ncOp);
return (1);
}
va_start(al, ncOp);
// grab the lock
sem_mywait(ncLock);
if ((pid = fork()) == 0) {
ncStub *ncs;
ncMetadata *localmeta = NULL;
LOGTRACE("forked to service NC invocation: %s\n", ncOp);
localmeta = EUCA_ZALLOC(1, sizeof(ncMetadata));
if (!localmeta) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
memcpy(localmeta, pMeta, sizeof(ncMetadata));
if (pMeta->correlationId) {
localmeta->correlationId = strdup(pMeta->correlationId);
} else {
localmeta->correlationId = strdup("unset");
}
if (pMeta->userId) {
localmeta->userId = strdup(pMeta->userId);
} else {
localmeta->userId = strdup("eucalyptus");
}
//TODO: zhill, change this to only be invoked on DescribeInstances and/or DescribeResources?
//Update meta from config
if (populateOutboundMeta(localmeta)) {
LOGERROR("Failed to update output service metadata\n");
}
//Don't need to filter, CC should only have received.
//filter_services(localmeta, config->ccStatus.serviceId.partition);
close(filedes[0]);
ncs = ncStubCreate(ncURL, NULL, NULL);
if (config->use_wssec) {
rc = InitWSSEC(ncs->env, ncs->stub, config->policyFile);
}
LOGTRACE("\tncOps=%s ppid=%d client calling '%s'\n", ncOp, getppid(), ncOp);
if (!strcmp(ncOp, "ncGetConsoleOutput")) {
// args: char *instId
char *instId = va_arg(al, char *);
char **consoleOutput = va_arg(al, char **);
rc = ncGetConsoleOutputStub(ncs, localmeta, instId, consoleOutput);
if (timeout && consoleOutput) {
if (!rc && *consoleOutput) {
len = strlen(*consoleOutput) + 1;
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], *consoleOutput, sizeof(char) * len);
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
} else if (!strcmp(ncOp, "ncAttachVolume")) {
char *instanceId = va_arg(al, char *);
char *volumeId = va_arg(al, char *);
char *remoteDev = va_arg(al, char *);
char *localDev = va_arg(al, char *);
rc = ncAttachVolumeStub(ncs, localmeta, instanceId, volumeId, remoteDev, localDev);
} else if (!strcmp(ncOp, "ncDetachVolume")) {
char *instanceId = va_arg(al, char *);
char *volumeId = va_arg(al, char *);
char *remoteDev = va_arg(al, char *);
char *localDev = va_arg(al, char *);
int force = va_arg(al, int);
rc = ncDetachVolumeStub(ncs, localmeta, instanceId, volumeId, remoteDev, localDev, force);
} else if (!strcmp(ncOp, "ncCreateImage")) {
char *instanceId = va_arg(al, char *);
char *volumeId = va_arg(al, char *);
char *remoteDev = va_arg(al, char *);
rc = ncCreateImageStub(ncs, localmeta, instanceId, volumeId, remoteDev);
} else if (!strcmp(ncOp, "ncPowerDown")) {
rc = ncPowerDownStub(ncs, localmeta);
} else if (!strcmp(ncOp, "ncAssignAddress")) {
char *instanceId = va_arg(al, char *);
char *publicIp = va_arg(al, char *);
rc = ncAssignAddressStub(ncs, localmeta, instanceId, publicIp);
} else if (!strcmp(ncOp, "ncBroadcastNetworkInfo")) {
char *networkInfo = va_arg(al, char *);
rc = ncBroadcastNetworkInfoStub(ncs, localmeta, networkInfo);
} else if (!strcmp(ncOp, "ncRebootInstance")) {
char *instId = va_arg(al, char *);
rc = ncRebootInstanceStub(ncs, localmeta, instId);
} else if (!strcmp(ncOp, "ncTerminateInstance")) {
char *instId = va_arg(al, char *);
int force = va_arg(al, int);
int *shutdownState = va_arg(al, int *);
int *previousState = va_arg(al, int *);
rc = ncTerminateInstanceStub(ncs, localmeta, instId, force, shutdownState, previousState);
if (timeout) {
if (!rc) {
len = 2;
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], shutdownState, sizeof(int));
rc = write(filedes[1], previousState, sizeof(int));
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
} else if (!strcmp(ncOp, "ncStartNetwork")) { //! @TODO remove this NC call logic, since it is not used any more
char *uuid = va_arg(al, char *);
char **peers = va_arg(al, char **);
int peersLen = va_arg(al, int);
int port = va_arg(al, int);
int vlan = va_arg(al, int);
char **outStatus = va_arg(al, char **);
rc = ncStartNetworkStub(ncs, localmeta, uuid, peers, peersLen, port, vlan, outStatus);
if (timeout && outStatus) {
if (!rc && *outStatus) {
len = strlen(*outStatus) + 1;
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], *outStatus, sizeof(char) * len);
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
if (outStatus)
EUCA_FREE(*outStatus);
} else if (!strcmp(ncOp, "ncRunInstance")) {
char *uuid = va_arg(al, char *);
char *instId = va_arg(al, char *);
char *reservationId = va_arg(al, char *);
virtualMachine *ncvm = va_arg(al, virtualMachine *);
char *imageId = va_arg(al, char *);
char *imageURL = va_arg(al, char *);
char *kernelId = va_arg(al, char *);
char *kernelURL = va_arg(al, char *);
char *ramdiskId = va_arg(al, char *);
char *ramdiskURL = va_arg(al, char *);
char *ownerId = va_arg(al, char *);
char *accountId = va_arg(al, char *);
char *keyName = va_arg(al, char *);
netConfig *ncnet = va_arg(al, netConfig *);
char *userData = va_arg(al, char *);
char *credential = va_arg(al, char *);
char *launchIndex = va_arg(al, char *);
char *platform = va_arg(al, char *);
int expiryTime = va_arg(al, int);
char **netNames = va_arg(al, char **);
int netNamesLen = va_arg(al, int);
char *rootDirective = va_arg(al, char *);
char **netIds = va_arg(al, char **);
int netIdsLen = va_arg(al, int);
ncInstance **outInst = va_arg(al, ncInstance **);
rc = ncRunInstanceStub(ncs, localmeta, uuid, instId, reservationId, ncvm, imageId, imageURL, kernelId, kernelURL, ramdiskId, ramdiskURL,
ownerId, accountId, keyName, ncnet, userData, credential, launchIndex, platform, expiryTime, netNames, netNamesLen, rootDirective, netIds,
netIdsLen, outInst);
if (timeout && outInst) {
if (!rc && *outInst) {
len = sizeof(ncInstance);
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], *outInst, sizeof(ncInstance));
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
if (outInst)
EUCA_FREE(*outInst);
} else if (!strcmp(ncOp, "ncDescribeInstances")) {
char **instIds = va_arg(al, char **);
int instIdsLen = va_arg(al, int);
ncInstance ***ncOutInsts = va_arg(al, ncInstance ***);
int *ncOutInstsLen = va_arg(al, int *);
rc = ncDescribeInstancesStub(ncs, localmeta, instIds, instIdsLen, ncOutInsts, ncOutInstsLen);
if (timeout && ncOutInsts && ncOutInstsLen) {
if (!rc) {
len = *ncOutInstsLen;
rc = write(filedes[1], &len, sizeof(int));
for (i = 0; i < len; i++) {
ncInstance *inst;
inst = (*ncOutInsts)[i];
rc = write(filedes[1], inst, sizeof(ncInstance));
}
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
if (ncOutInsts) {
if (ncOutInstsLen) {
for (i = 0; i < (*ncOutInstsLen); i++) {
EUCA_FREE((*ncOutInsts)[i]);
}
}
EUCA_FREE(*ncOutInsts);
}
} else if (!strcmp(ncOp, "ncDescribeResource")) {
char *resourceType = va_arg(al, char *);
ncResource **outRes = va_arg(al, ncResource **);
char **errMsg = va_arg(al, char **);
LOGTRACE("\tcalling ncDescribeResourceStub with resourceType=%s outRes=%lx errMsg=%lx\n", resourceType, (unsigned long)outRes, (unsigned long)errMsg);
rc = ncDescribeResourceStub(ncs, localmeta, resourceType, outRes);
LOGTRACE("\tcalled ncDescribeResourceStub, rc = %d, timeout = %d\n", rc, timeout);
if (timeout && outRes) {
if (!rc && *outRes) {
len = sizeof(ncResource);
rc = write(filedes[1], &rc, sizeof(int)); //NOTE: we write back rc as well
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], *outRes, sizeof(ncResource));
rc = 0;
} else {
(*errMsg) = (char *)axutil_error_get_message(ncs->env->error);
LOGTRACE("\terrMsg = %s\n", *errMsg);
if (*errMsg && (len = strnlen(*errMsg, 1024 - 1))) {
len += 1;
rc = write(filedes[1], &rc, sizeof(int)); //NOTE: we write back rc as well
rc = write(filedes[1], &len, sizeof(int));
rc = write(filedes[1], *errMsg, sizeof(char) * len);
} else {
len = 0;
rc = write(filedes[1], &rc, sizeof(int)); //NOTE: we write back rc as well
rc = write(filedes[1], &len, sizeof(int));
}
rc = 1;
}
}
if (outRes)
EUCA_FREE(*outRes);
} else if (!strcmp(ncOp, "ncDescribeSensors")) {
int history_size = va_arg(al, int);
long long collection_interval_time_ms = va_arg(al, long long);
char **instIds = va_arg(al, char **);
int instIdsLen = va_arg(al, int);
char **sensorIds = va_arg(al, char **);
int sensorIdsLen = va_arg(al, int);
sensorResource ***srs = va_arg(al, sensorResource ***);
int *srsLen = va_arg(al, int *);
rc = ncDescribeSensorsStub(ncs, localmeta, history_size, collection_interval_time_ms, instIds, instIdsLen, sensorIds, sensorIdsLen, srs, srsLen);
if (timeout && srs && srsLen) {
if (!rc) {
len = *srsLen;
rc = write(filedes[1], &len, sizeof(int));
for (i = 0; i < len; i++) {
sensorResource *sr;
sr = (*srs)[i];
rc = write(filedes[1], sr, sizeof(sensorResource));
}
rc = 0;
} else {
len = 0;
rc = write(filedes[1], &len, sizeof(int));
rc = 1;
}
}
if (srs) {
if (srsLen) {
for (i = 0; i < (*srsLen); i++) {
EUCA_FREE((*srs)[i]);
}
}
EUCA_FREE(*srs);
}
} else if (!strcmp(ncOp, "ncBundleInstance")) {
char *instanceId = va_arg(al, char *);
char *bucketName = va_arg(al, char *);
char *filePrefix = va_arg(al, char *);
char *objectStorageURL = va_arg(al, char *);
char *userPublicKey = va_arg(al, char *);
char *S3Policy = va_arg(al, char *);
char *S3PolicySig = va_arg(al, char *);
char *architecture = va_arg(al, char *);
rc = ncBundleInstanceStub(ncs, localmeta, instanceId, bucketName, filePrefix, objectStorageURL, userPublicKey, S3Policy, S3PolicySig, architecture);
} else if (!strcmp(ncOp, "ncBundleRestartInstance")) {
char *instanceId = va_arg(al, char *);
rc = ncBundleRestartInstanceStub(ncs, localmeta, instanceId);
} else if (!strcmp(ncOp, "ncCancelBundleTask")) {
char *instanceId = va_arg(al, char *);
rc = ncCancelBundleTaskStub(ncs, localmeta, instanceId);
} else if (!strcmp(ncOp, "ncModifyNode")) {
char *stateName = va_arg(al, char *);
rc = ncModifyNodeStub(ncs, localmeta, stateName);
} else if (!strcmp(ncOp, "ncMigrateInstances")) {
ncInstance **instances = va_arg(al, ncInstance **);
int instancesLen = va_arg(al, int);
char *action = va_arg(al, char *);
char *credentials = va_arg(al, char *);
rc = ncMigrateInstancesStub(ncs, localmeta, instances, instancesLen, action, credentials);
WRITE_REPLY_STRING;
} else if (!strcmp(ncOp, "ncStartInstance")) {
char *instanceId = va_arg(al, char *);
rc = ncStartInstanceStub(ncs, localmeta, instanceId);
WRITE_REPLY_STRING;
} else if (!strcmp(ncOp, "ncStopInstance")) {
char *instanceId = va_arg(al, char *);
rc = ncStopInstanceStub(ncs, localmeta, instanceId);
WRITE_REPLY_STRING;
} else {
LOGWARN("\tncOps=%s ppid=%d operation '%s' not found\n", ncOp, getppid(), ncOp);
rc = 1;
}
LOGTRACE("\tncOps=%s ppid=%d done calling '%s' with exit code '%d'\n", ncOp, getppid(), ncOp, rc);
if (localmeta->replyString != NULL) {
LOGDEBUG("NC replied to '%s' with '%s'\n", ncOp, localmeta->replyString);
}
if (rc) {
ret = 1;
} else {
ret = 0;
}
close(filedes[1]);
// Free our local meta data structure and associated memory
EUCA_FREE(localmeta->replyString);
EUCA_FREE(localmeta->correlationId);
EUCA_FREE(localmeta->userId);
EUCA_FREE(localmeta);
// ditch our stub
if (ncs != NULL) {
ncStubDestroy(ncs);
ncs = NULL;
}
exit(ret);
} else {
// returns for each client call
close(filedes[1]);
if (!strcmp(ncOp, "ncGetConsoleOutput")) {
char *instId = NULL;
char **outConsoleOutput = NULL;
instId = va_arg(al, char *);
outConsoleOutput = va_arg(al, char **);
if (outConsoleOutput) {
*outConsoleOutput = NULL;
}
if (timeout && outConsoleOutput) {
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
*outConsoleOutput = EUCA_ALLOC(len, sizeof(char));
if (!*outConsoleOutput) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], *outConsoleOutput, len, timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
}
}
} else if (!strcmp(ncOp, "ncTerminateInstance")) {
char *instId = NULL;
int force = 0;
int *shutdownState = NULL;
int *previousState = NULL;
instId = va_arg(al, char *);
force = va_arg(al, int);
shutdownState = va_arg(al, int *);
previousState = va_arg(al, int *);
if (shutdownState && previousState) {
*shutdownState = *previousState = 0;
}
if (timeout && shutdownState && previousState) {
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
rbytes = timeread(filedes[0], shutdownState, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
rbytes = timeread(filedes[0], previousState, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
}
}
} else if (!strcmp(ncOp, "ncStartNetwork")) { //! @TODO remove this NC call logic, since it is not used any more
char *uuid = NULL;
char **peers = NULL;
int peersLen = 0;
int port = 0;
int vlan = 0;
char **outStatus = NULL;
uuid = va_arg(al, char *);
peers = va_arg(al, char **);
peersLen = va_arg(al, int);
port = va_arg(al, int);
vlan = va_arg(al, int);
outStatus = va_arg(al, char **);
if (outStatus) {
*outStatus = NULL;
}
if (timeout && outStatus) {
*outStatus = NULL;
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
*outStatus = EUCA_ALLOC(len, sizeof(char));
if (!*outStatus) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], *outStatus, len, timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
}
}
} else if (!strcmp(ncOp, "ncRunInstance")) {
char *uuid = NULL;
char *instId = NULL;
char *reservationId = NULL;
virtualMachine *ncvm = NULL;
char *imageId = NULL;
char *imageURL = NULL;
char *kernelId = NULL;
char *kernelURL = NULL;
char *ramdiskId = NULL;
char *ramdiskURL = NULL;
char *ownerId = NULL;
char *accountId = NULL;
char *keyName = NULL;
netConfig *ncnet = NULL;
char *userData = NULL;
char *credential = NULL;
char *launchIndex = NULL;
char *platform = NULL;
int expiryTime = 0;
char **netNames = NULL;
int netNamesLen = 0;
char *rootDirective = NULL;
char **netIds = NULL;
int netIdsLen = 0;
ncInstance **outInst = NULL;
uuid = va_arg(al, char *);
instId = va_arg(al, char *);
reservationId = va_arg(al, char *);
ncvm = va_arg(al, virtualMachine *);
imageId = va_arg(al, char *);
imageURL = va_arg(al, char *);
kernelId = va_arg(al, char *);
kernelURL = va_arg(al, char *);
ramdiskId = va_arg(al, char *);
ramdiskURL = va_arg(al, char *);
ownerId = va_arg(al, char *);
accountId = va_arg(al, char *);
keyName = va_arg(al, char *);
ncnet = va_arg(al, netConfig *);
userData = va_arg(al, char *);
credential = va_arg(al, char *);
launchIndex = va_arg(al, char *);
platform = va_arg(al, char *);
expiryTime = va_arg(al, int);
netNames = va_arg(al, char **);
netNamesLen = va_arg(al, int);
rootDirective = va_arg(al, char *);
netIds = va_arg(al, char **);
netIdsLen = va_arg(al, int);
outInst = va_arg(al, ncInstance **);
if (outInst) {
*outInst = NULL;
}
if (timeout && outInst) {
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
*outInst = EUCA_ZALLOC(1, sizeof(ncInstance));
if (!*outInst) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], *outInst, sizeof(ncInstance), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
}
}
} else if (!strcmp(ncOp, "ncDescribeInstances")) {
char **instIds = NULL;
int instIdsLen = 0;
ncInstance ***ncOutInsts = NULL;
int *ncOutInstsLen = NULL;
instIds = va_arg(al, char **);
instIdsLen = va_arg(al, int);
ncOutInsts = va_arg(al, ncInstance ***);
ncOutInstsLen = va_arg(al, int *);
if (ncOutInstsLen && ncOutInsts) {
*ncOutInstsLen = 0;
*ncOutInsts = NULL;
}
if (timeout && ncOutInsts && ncOutInstsLen) {
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
*ncOutInsts = EUCA_ZALLOC(len, sizeof(ncInstance *));
if (!*ncOutInsts) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
*ncOutInstsLen = len;
for (i = 0; i < len; i++) {
ncInstance *inst;
inst = EUCA_ZALLOC(1, sizeof(ncInstance));
if (!inst) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], inst, sizeof(ncInstance), timeout);
(*ncOutInsts)[i] = inst;
}
}
}
} else if (!strcmp(ncOp, "ncDescribeResource")) {
char *resourceType = NULL;
char **errMsg = NULL;
ncResource **outRes = NULL;
resourceType = va_arg(al, char *);
outRes = va_arg(al, ncResource **);
errMsg = va_arg(al, char **);
if (outRes) {
*outRes = NULL;
}
if (timeout && outRes) {
// first int we read back is the 'rc', then the 'len'
rbytes = timeread(filedes[0], &opFail, sizeof(int), timeout);
if (rbytes <= 0 || (rbytes = timeread(filedes[0], &len, sizeof(int), timeout)) <= 0) {
killwait(pid);
opFail = 1;
} else if (opFail && len) {
*errMsg = EUCA_ZALLOC(len, sizeof(char));
if (*errMsg == NULL) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], *errMsg, len * sizeof(char), timeout);
if (rbytes <= 0 || opFail) {
kill(pid, SIGKILL);
opFail = 1;
}
} else {
*outRes = EUCA_ZALLOC(1, sizeof(ncResource));
if (*outRes == NULL) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], *outRes, sizeof(ncResource), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
}
}
}
} else if (!strcmp(ncOp, "ncDescribeSensors")) {
int history_size = 0;
long long collection_interval_time_ms = 0L;
char **instIds = NULL;
int instIdsLen = 0;
char **sensorIds = NULL;
int sensorIdsLen = 0;
sensorResource ***srs = NULL;
int *srsLen = NULL;
history_size = va_arg(al, int);
collection_interval_time_ms = va_arg(al, long long);
instIds = va_arg(al, char **);
instIdsLen = va_arg(al, int);
sensorIds = va_arg(al, char **);
sensorIdsLen = va_arg(al, int);
srs = va_arg(al, sensorResource ***);
srsLen = va_arg(al, int *);
if (srs && srsLen) {
*srs = NULL;
*srsLen = 0;
}
if (timeout && srs && srsLen) {
rbytes = timeread(filedes[0], &len, sizeof(int), timeout);
if (rbytes <= 0) {
killwait(pid);
opFail = 1;
} else {
*srs = EUCA_ZALLOC(len, sizeof(sensorResource *));
if (*srs == NULL) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
*srsLen = len;
for (i = 0; i < len; i++) {
sensorResource *sr;
sr = EUCA_ZALLOC(1, sizeof(sensorResource));
if (sr == NULL) {
LOGFATAL("out of memory! ncOps=%s\n", ncOp);
unlock_exit(1);
}
rbytes = timeread(filedes[0], sr, sizeof(sensorResource), timeout);
(*srs)[i] = sr;
}
}
}
} else if (!strcmp(ncOp, "ncMigrateInstances")) {
READ_REPLY_STRING;
} else if (!strcmp(ncOp, "ncStartInstance")) {
READ_REPLY_STRING;
} else if (!strcmp(ncOp, "ncStopInstance")) {
READ_REPLY_STRING;
} else {
// nothing to do in default case (succ/fail encoded in exit code)
}
close(filedes[0]);
if (timeout) {
rc = timewait(pid, &status, timeout);
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
} else {
int sig = -1;
int dump = 0;
if (WIFSIGNALED(status)) {
sig = WTERMSIG(status);
dump = WCOREDUMP(status);
}
if (sig == SIGTERM || sig == SIGKILL) { // our killwait() first tries SIGTERM and then SIGKILL
LOGDEBUG("child process %d handling '%s' was terminated with %d\n", pid, ncOp, sig);
} else {
LOGERROR("BUG: child process %d handling '%s' was terminated with %d (core=%d)\n", pid, ncOp, sig, dump);
}
rc = 1;
}
} else {
rc = 0;
}
}
LOGTRACE("done ncOps=%s clientrc=%d opFail=%d\n", ncOp, rc, opFail);
if (rc || opFail) {
ret = 1;
} else {
ret = 0;
}
// release the lock
sem_mypost(ncLock);
va_end(al);
return (ret);
#undef WRITE_REPLY_STRING
#undef READ_REPLY_STRING
}
//!
//! Calculate nc call timeout, based on when operation was started (op_start), the total
//! number of calls to make (numCalls), and the current progress (idx)
//!
//! @param[in] op_start
//! @param[in] op_max
//! @param[in] numCalls
//! @param[in] idx
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int ncGetTimeout(time_t op_start, time_t op_max, int numCalls, int idx)
{
time_t op_timer, op_pernode;
int numLeft;
numLeft = numCalls - idx;
if (numLeft <= 0) {
numLeft = 1;
}
op_timer = op_max - (time(NULL) - op_start);
op_pernode = op_timer / numLeft;
return (maxint(minint(op_pernode, OP_TIMEOUT_PERNODE), OP_TIMEOUT_MIN));
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] volumeId the volume identifier string (vol-XXXXXXXX)
//! @param[in] instanceId
//! @param[in] remoteDev
//! @param[in] localDev
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doAttachVolume(ncMetadata * pMeta, char *volumeId, char *instanceId, char *remoteDev, char *localDev)
{
int i, rc, start = 0, stop = 0, ret = 0, done = 0, timeout;
ccInstance *myInstance;
time_t op_start;
ccResourceCache resourceCacheLocal;
i = 0;
myInstance = NULL;
op_start = time(NULL);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("[%s][%s] attaching volume\n", SP(instanceId), SP(volumeId));
LOGDEBUG("invoked: userId=%s, volumeId=%s, instanceId=%s, remoteDev=%s, localDev=%s\n", SP(pMeta ? pMeta->userId : "UNSET"), SP(volumeId), SP(instanceId), SP(remoteDev),
SP(localDev));
if (!volumeId || !instanceId || !remoteDev || !localDev) {
LOGERROR("bad input params\n");
return (1);
}
sem_mywait(RESCACHE);
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
rc = find_instanceCacheId(instanceId, &myInstance);
if (!rc) {
// found the instance in the cache
if (myInstance) {
start = myInstance->ncHostIdx;
stop = start + 1;
EUCA_FREE(myInstance);
}
} else {
start = 0;
stop = resourceCacheLocal.numResources;
}
done = 0;
for (i = start; i < stop && !done; i++) {
timeout = ncGetTimeout(op_start, OP_TIMEOUT, stop - start, i);
timeout = maxint(timeout, ATTACH_VOL_TIMEOUT_SECONDS);
rc = ncClientCall(pMeta, timeout, resourceCacheLocal.resources[i].lockidx, resourceCacheLocal.resources[i].ncURL, "ncAttachVolume",
instanceId, volumeId, remoteDev, localDev);
if (rc) {
ret = 1;
} else {
ret = 0;
done++;
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] volumeId the volume identifier string (vol-XXXXXXXX)
//! @param[in] instanceId
//! @param[in] remoteDev
//! @param[in] localDev
//! @param[in] force
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doDetachVolume(ncMetadata * pMeta, char *volumeId, char *instanceId, char *remoteDev, char *localDev, int force)
{
int i, rc, start = 0, stop = 0, ret = 0, done = 0, timeout;
ccInstance *myInstance;
time_t op_start;
ccResourceCache resourceCacheLocal;
i = 0;
myInstance = NULL;
op_start = time(NULL);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("[%s][%s] detaching volume\n", SP(instanceId), SP(volumeId));
LOGDEBUG("invoked: volumeId=%s, instanceId=%s, remoteDev=%s, localDev=%s, force=%d\n", SP(volumeId), SP(instanceId), SP(remoteDev), SP(localDev), force);
if (!volumeId || !instanceId || !remoteDev || !localDev) {
LOGERROR("bad input params\n");
return (1);
}
sem_mywait(RESCACHE);
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
rc = find_instanceCacheId(instanceId, &myInstance);
if (!rc) {
// found the instance in the cache
if (myInstance) {
start = myInstance->ncHostIdx;
stop = start + 1;
EUCA_FREE(myInstance);
}
} else {
start = 0;
stop = resourceCacheLocal.numResources;
}
for (i = start; i < stop; i++) {
timeout = ncGetTimeout(op_start, OP_TIMEOUT, stop - start, i);
timeout = maxint(timeout, DETACH_VOL_TIMEOUT_SECONDS);
rc = ncClientCall(pMeta, timeout, resourceCacheLocal.resources[i].lockidx, resourceCacheLocal.resources[i].ncURL, "ncDetachVolume",
instanceId, volumeId, remoteDev, localDev, force);
if (rc) {
ret = 1;
} else {
ret = 0;
done++;
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] accountId
//! @param[in] type
//! @param[in] namedLen
//! @param[in] sourceNames
//! @param[in] userNames
//! @param[in] netLen
//! @param[in] sourceNets
//! @param[in] destName
//! @param[in] destUserName
//! @param[in] protocol
//! @param[in] minPort
//! @param[in] maxPort
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doConfigureNetwork(ncMetadata * pMeta, char *accountId, char *type, int namedLen, char **sourceNames, char **userNames, int netLen,
char **sourceNets, char *destName, char *destUserName, char *protocol, int minPort, int maxPort)
{
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] networkInfo
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doBroadcastNetworkInfo(ncMetadata * pMeta, char *networkInfo)
{
#define EUCANETD_GNI_FILE EUCALYPTUS_STATE_DIR "/global_network_info.xml"
int i = 0;
int rc = 0;
char *xmlbuf = NULL;
char xmlfile[EUCA_MAX_PATH] = "";
globalNetworkInfo *gni = NULL;
gni_hostname_info *host_info = NULL;
gni_cluster *myself = NULL;
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGDEBUG("invoked: networkInfo=%.16s\n", SP(networkInfo));
if (!networkInfo) {
LOGDEBUG("bad input params\n");
return (1);
}
// init the XML
xmlbuf = base64_dec((unsigned char *)networkInfo, strlen(networkInfo));
if (xmlbuf) {
LOGEXTREME("%s\n", xmlbuf);
snprintf(xmlfile, EUCA_MAX_PATH, "/tmp/euca-global-net-XXXXXX");
if (str2file(xmlbuf, xmlfile, O_CREAT | O_EXCL | O_RDWR, 0644, TRUE) == EUCA_OK) {
LOGDEBUG("created and populated tmpfile '%s'\n", xmlfile);
gni = gni_init();
host_info = gni_init_hostname_info();
if (gni && host_info) {
// decode/read/parse the globalnetworkinfo, assign any incorrect public/private IP mappings based on global view
rc = gni_populate(gni,host_info,xmlfile);
LOGDEBUG("done with gni_populate()\n");
// do any CC actions based on contents of new network view
// reset macprefix
if ((rc = gni_find_self_cluster(gni, &myself)) != 0) {
LOGWARN("failed to find local host IP in list of enabled clusters, skipping macPrefix update\n");
} else {
sem_mywait(NETCONFIG);
{
if (myself && strlen(myself->macPrefix) && strcmp(gpEucaNet->sMacPrefix, myself->macPrefix)) {
LOGDEBUG("reset local cluster macPrefix from '%s' to '%s'\n", gpEucaNet->sMacPrefix, myself->macPrefix);
snprintf(gpEucaNet->sMacPrefix, ENET_MACPREFIX_LEN, "%s", myself->macPrefix);
}
}
sem_mypost(NETCONFIG);
}
LOGTRACE("gni->max_instances == %d\n", gni->max_instances);
for (i = 0; i < gni->max_instances; i++) {
char *strptra = NULL, *strptrb = NULL;
ccInstance *myInstance = NULL;
strptra = hex2dot(gni->instances[i].publicIp);
strptrb = hex2dot(gni->instances[i].privateIp);
if (gni->instances[i].publicIp && gni->instances[i].privateIp) {
LOGDEBUG("found instance in broadcast network info: %s (%s/%s)\n", gni->instances[i].name, SP(strptra), SP(strptrb));
// here, we should decide if we need to send the mapping, or not
rc = find_instanceCacheIP(strptrb, &myInstance);
if (myInstance && !strcmp(myInstance->ccnet.privateIp, strptrb)) {
if (!strcmp(myInstance->ccnet.publicIp, strptra)) {
LOGTRACE("instance '%s' cached pub/priv IP mappings match input pub/priv IP (publicIp=%s privateIp=%s)\n", myInstance->instanceId,
myInstance->ccnet.publicIp, myInstance->ccnet.privateIp);
} else {
LOGTRACE("instance '%s' cached pub/priv IP mappings do not match input pub/priv IP, updating ground-truth (cached_publicIp=%s input_publicIp=%s)\n",
myInstance->instanceId, myInstance->ccnet.publicIp, strptra);
rc = doAssignAddress(pMeta, NULL, strptra, strptrb);
}
}
if (myInstance) {
EUCA_FREE(myInstance);
}
LOGDEBUG("instance '%s' has assigned address: (%s -> %s) rc: %d\n", gni->instances[i].name, strptra, strptrb, rc);
} else {
LOGDEBUG("instance does not have either public or private IP set (id=%s pub=%s priv=%s)\n", gni->instances[i].name, SP(strptra), SP(strptrb));
}
EUCA_FREE(strptra);
EUCA_FREE(strptrb);
}
}
// Free up gni and host_info memory
rc = gni_free(gni);
rc = gni_hostnames_free(host_info);
unlink(xmlfile);
}
snprintf(xmlfile, EUCA_MAX_PATH, EUCANETD_GNI_FILE, config->eucahome);
if (str2file(xmlbuf, xmlfile, O_CREAT | O_TRUNC | O_WRONLY, 0600, FALSE) != EUCA_OK) {
LOGDEBUG("Failed to populate GNI file '%s'\n", xmlfile);
}
EUCA_FREE(xmlbuf);
}
sem_mywait(GLOBALNETWORKINFO);
// populate globalnetworkinfo
snprintf(globalnetworkinfo->networkInfo, MAX_NETWORK_INFO_LEN, "%s", networkInfo);
config->kick_broadcast_network_info = 1;
sem_mypost(GLOBALNETWORKINFO);
LOGTRACE("done.\n");
return (0);
#undef EUCANETD_GNI_FILE
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] uuid
//! @param[in] src
//! @param[in] dst
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doAssignAddress(ncMetadata * pMeta, char *uuid, char *src, char *dst)
{
int rc = 0;
int ret = 0;
ccInstance *myInstance = NULL;
ccResourceCache resourceCacheLocal = { {{{0}}} };
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("[%.36s] assigning address %s to %s\n", SP(pMeta->correlationId), SP(src), SP(dst));
LOGDEBUG("invoked: correlationId=%s, src=%s, dst=%s, uuid=%s\n", SP(pMeta->correlationId), SP(src), SP(dst), SP(uuid));
if (!src || !dst || !strcmp(src, "0.0.0.0")) {
LOGDEBUG("bad input params\n");
return (1);
}
set_dirty_instanceCache();
sem_mywait(RESCACHE);
{
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
}
sem_mypost(RESCACHE);
ret = 1;
if ((rc = find_instanceCacheIP(dst, &myInstance)) == 0) {
ret = 0;
EUCA_FREE(myInstance);
} else {
LOGDEBUG("skipping %s->%s mapping, as this clusters does not own the instance (%s)\n", src, dst, dst);
}
if (!ret && strcmp(dst, "0.0.0.0")) {
// everything worked, update instance cache
if ((rc = map_instanceCache(privIpCmp, dst, pubIpSet, src)) != 0) {
LOGERROR("map_instanceCache() failed to assign %s->%s\n", dst, src);
} else {
if ((rc = find_instanceCacheIP(src, &myInstance)) == 0) {
LOGDEBUG("found instance (%s) in cache with IP (%s)\n", myInstance->instanceId, myInstance->ccnet.publicIp);
// found the instance in the cache
if (myInstance) {
//timeout = ncGetTimeout(op_start, OP_TIMEOUT, 1, myInstance->ncHostIdx);
rc = ncClientCall(pMeta, OP_TIMEOUT, resourceCacheLocal.resources[myInstance->ncHostIdx].lockidx,
resourceCacheLocal.resources[myInstance->ncHostIdx].ncURL, "ncAssignAddress", myInstance->instanceId, myInstance->ccnet.publicIp);
if (rc) {
LOGERROR("could not sync public IP %s with NC\n", src);
ret = 1;
} else {
ret = 0;
}
EUCA_FREE(myInstance);
}
}
}
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] src
//! @param[in] dst
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doUnassignAddress(ncMetadata * pMeta, char *src, char *dst)
{
int rc = 0;
int ret = 0;
ccInstance *myInstance = NULL;
ccResourceCache resourceCacheLocal = { {{{0}}} };
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
LOGINFO("unassigning address %s\n", SP(src));
LOGDEBUG("invoked: correlationId=%s, userId=%s, src=%s, dst=%s\n", SP(pMeta->correlationId), SP(pMeta ? pMeta->userId : "UNSET"), SP(src), SP(dst));
if (!src || !dst || !strcmp(src, "0.0.0.0")) {
LOGDEBUG("bad input params\n");
return (1);
}
set_dirty_instanceCache();
sem_mywait(RESCACHE);
{
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
}
sem_mypost(RESCACHE);
ret = 0;
if ((rc = find_instanceCacheIP(src, &myInstance)) == 0) {
LOGDEBUG("found instance %s in cache with IP %s\n", myInstance->instanceId, myInstance->ccnet.publicIp);
// found the instance in the cache
if (myInstance) {
//timeout = ncGetTimeout(op_start, OP_TIMEOUT, 1, myInstance->ncHostIdx);
rc = ncClientCall(pMeta, OP_TIMEOUT, resourceCacheLocal.resources[myInstance->ncHostIdx].lockidx,
resourceCacheLocal.resources[myInstance->ncHostIdx].ncURL, "ncAssignAddress", myInstance->instanceId, "0.0.0.0");
if (rc) {
LOGERROR("could not sync IP with NC\n");
ret = 1;
}
// refresh instance cache
if ((rc = map_instanceCache(pubIpCmp, src, pubIpSet, "0.0.0.0")) != 0) {
LOGERROR("map_instanceCache() failed to assign %s->%s\n", dst, src);
}
}
EUCA_FREE(myInstance);
}
LOGTRACE("done\n");
shawn();
return (ret);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] accountId
//! @param[in] netName
//! @param[in] vlan
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doStopNetwork(ncMetadata * pMeta, char *accountId, char *netName, int vlan)
{
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] vmsubdomain the internal subdomain name to put in euca-dhcp.conf to provide to instances
//! @param[in] nameservers comma separated list of name servers to give to the instances
//! @param[in] ccs
//! @param[in] ccsLen
//! @param[out] outvnetConfig
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doDescribeNetworks(ncMetadata * pMeta, char **ccs, int ccsLen)
{
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] accountId
//! @param[in] uuid
//! @param[in] netName
//! @param[in] vlan
//! @param[in] vmsubdomain the internal subdomain name to put in euca-dhcp.conf to provide to instances
//! @param[in] nameservers comma separated list of name servers to give to the instances
//! @param[in] ccs
//! @param[in] ccsLen
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doStartNetwork(ncMetadata * pMeta, char *accountId, char *uuid, char *groupId, char *netName, int vlan, char *vmsubdomain, char *nameservers, char **ccs, int ccsLen)
{
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] ccvms
//! @param[in] vmLen
//! @param[out] outTypesMax
//! @param[out] outTypesAvail
//! @param[out] outTypesLen
//! @param[out] outNodes
//! @param[out] outNodesLen
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doDescribeResources(ncMetadata * pMeta, virtualMachine ** ccvms, int vmLen, int **outTypesMax, int **outTypesAvail, int *outTypesLen, ccResource ** outNodes, int *outNodesLen)
{
int i;
int rc, diskpool, mempool, corepool;
int j;
ccResource *res;
ccResourceCache resourceCacheLocal;
LOGDEBUG("invoked: userId=%s, vmLen=%d\n", SP(pMeta ? pMeta->userId : "UNSET"), vmLen);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
if (outTypesMax == NULL || outTypesAvail == NULL || outTypesLen == NULL || outNodes == NULL || outNodesLen == NULL) {
// input error
return (1);
}
*outTypesMax = NULL;
*outTypesAvail = NULL;
*outTypesMax = EUCA_ZALLOC(vmLen, sizeof(int));
*outTypesAvail = EUCA_ZALLOC(vmLen, sizeof(int));
if (*outTypesMax == NULL || *outTypesAvail == NULL) {
LOGERROR("out of memory\n");
unlock_exit(1);
}
*outTypesLen = vmLen;
for (i = 0; i < vmLen; i++) {
if ((*ccvms)[i].mem <= 0 || (*ccvms)[i].cores <= 0 || (*ccvms)[i].disk <= 0) {
LOGERROR("input error\n");
EUCA_FREE(*outTypesAvail);
EUCA_FREE(*outTypesMax);
*outTypesLen = 0;
return (1);
}
}
sem_mywait(RESCACHE);
memcpy(&resourceCacheLocal, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
{
*outNodes = EUCA_ZALLOC(resourceCacheLocal.numResources, sizeof(ccResource));
if (*outNodes == NULL) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
} else {
memcpy(*outNodes, resourceCacheLocal.resources, sizeof(ccResource) * resourceCacheLocal.numResources);
*outNodesLen = resourceCacheLocal.numResources;
}
for (i = 0; i < resourceCacheLocal.numResources; i++) {
res = &(resourceCacheLocal.resources[i]);
for (j = 0; j < vmLen; j++) {
if (res->ncState == STOPPED) {
mempool = 0;
diskpool = 0;
corepool = 0;
} else {
mempool = res->availMemory;
diskpool = res->availDisk;
corepool = res->availCores;
}
mempool -= (*ccvms)[j].mem;
diskpool -= (*ccvms)[j].disk;
corepool -= (*ccvms)[j].cores;
while (mempool >= 0 && diskpool >= 0 && corepool >= 0) {
(*outTypesAvail)[j]++;
mempool -= (*ccvms)[j].mem;
diskpool -= (*ccvms)[j].disk;
corepool -= (*ccvms)[j].cores;
}
if (res->ncState == STOPPED) {
mempool = 0;
diskpool = 0;
corepool = 0;
} else {
mempool = res->maxMemory;
diskpool = res->maxDisk;
corepool = res->maxCores;
}
mempool -= (*ccvms)[j].mem;
diskpool -= (*ccvms)[j].disk;
corepool -= (*ccvms)[j].cores;
while (mempool >= 0 && diskpool >= 0 && corepool >= 0) {
(*outTypesMax)[j]++;
mempool -= (*ccvms)[j].mem;
diskpool -= (*ccvms)[j].disk;
corepool -= (*ccvms)[j].cores;
}
}
}
}
if (vmLen >= 5) {
LOGDEBUG("resources summary ({avail/max}): %s{%d/%d} %s{%d/%d} %s{%d/%d} %s{%d/%d} %s{%d/%d}\n", (*ccvms)[0].name,
(*outTypesAvail)[0], (*outTypesMax)[0], (*ccvms)[1].name, (*outTypesAvail)[1], (*outTypesMax)[1], (*ccvms)[2].name,
(*outTypesAvail)[2], (*outTypesMax)[2], (*ccvms)[3].name, (*outTypesAvail)[3], (*outTypesMax)[3], (*ccvms)[4].name, (*outTypesAvail)[4], (*outTypesMax)[4]);
}
LOGTRACE("done\n");
shawn();
return (0);
}
//!
//!
//!
//! @param[in] in
//! @param[in] newstate
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int changeState(ccResource * in, int newstate)
{
if (in == NULL)
return (1);
if (in->state == newstate)
return (0);
in->lastState = in->state;
in->state = newstate;
in->stateChange = time(NULL);
in->idleStart = 0;
return (0);
}
//!
//!
//!
//! @param[in] pMeta
//! @param[in] timeout
//! @param[in] dolock
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int broadcast_network_info(ncMetadata * pMeta, int timeout, int dolock)
{
int i = 0;
int rc = 0;
int pid = 0;
int *pids = NULL;
int status = 0;
time_t op_start = { 0 };
char *networkInfo = NULL;
if (timeout <= 0)
timeout = 1;
op_start = time(NULL);
LOGDEBUG("invoked: timeout=%d, dolock=%d\n", timeout, dolock);
// critical NC call section
sem_mywait(RESCACHE);
memcpy(resourceCacheStage, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
pids = EUCA_ZALLOC(resourceCacheStage->numResources, sizeof(int));
if (!pids) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
// here is where we will convert the globalnetworkinfo into the broadcast string
sem_mywait(GLOBALNETWORKINFO);
networkInfo = strdup(globalnetworkinfo->networkInfo);
sem_mypost(GLOBALNETWORKINFO);
for (i = 0; i < resourceCacheStage->numResources; i++) {
sem_mywait(REFRESHLOCK);
pid = fork();
if (!pid) {
// do the broadcast
rc = ncClientCall(pMeta, 0, resourceCacheStage->resources[i].lockidx, resourceCacheStage->resources[i].ncURL, "ncBroadcastNetworkInfo", networkInfo);
if (rc != 0) {
LOGERROR("bad return from ncDescribeResource(%s) (%d)\n", resourceCacheStage->resources[i].hostname, rc);
}
sem_mypost(REFRESHLOCK);
exit(0);
} else {
pids[i] = pid;
}
}
// free the broadcast string
EUCA_FREE(networkInfo);
for (i = 0; i < resourceCacheStage->numResources; i++) {
rc = timewait(pids[i], &status, 120);
if (!rc) {
// timed out, really bad failure (reset REFRESHLOCK semaphore)
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
rc = 1;
} else if (rc > 0) {
// process exited, and wait picked it up.
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
} else {
rc = 1;
}
} else {
// process no longer exists, and someone else reaped it
rc = 0;
}
if (rc) {
LOGWARN("error waiting for child pid '%d', exit code '%d'\n", pids[i], rc);
}
}
EUCA_FREE(pids);
LOGTRACE("done\n");
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] timeout
//! @param[in] dolock
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int refresh_resources(ncMetadata * pMeta, int timeout, int dolock)
{
int i, rc, nctimeout, pid, *pids = NULL;
int status;
time_t op_start;
ncResource *ncResDst = NULL;
if (timeout <= 0)
timeout = 1;
op_start = time(NULL);
LOGDEBUG("invoked: timeout=%d, dolock=%d\n", timeout, dolock);
// critical NC call section
sem_mywait(RESCACHE);
memcpy(resourceCacheStage, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
pids = EUCA_ZALLOC(resourceCacheStage->numResources, sizeof(int));
if (!pids) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
for (i = 0; i < resourceCacheStage->numResources; i++) {
sem_mywait(REFRESHLOCK);
pid = fork();
if (!pid) {
ncResDst = NULL;
if (resourceCacheStage->resources[i].state != RESASLEEP && resourceCacheStage->resources[i].running == 0) {
nctimeout = ncGetTimeout(op_start, timeout, 1, 1);
char *errMsg = NULL;
rc = ncClientCall(pMeta, nctimeout, resourceCacheStage->resources[i].lockidx, resourceCacheStage->resources[i].ncURL,
"ncDescribeResource", NULL, &ncResDst, &errMsg);
if (rc != 0) {
powerUp(&(resourceCacheStage->resources[i]));
if (resourceCacheStage->resources[i].state == RESWAKING && ((time(NULL) - resourceCacheStage->resources[i].stateChange) < config->wakeThresh)) {
LOGDEBUG("resource still waking up (%ld more seconds until marked as down)\n",
config->wakeThresh - (time(NULL) - resourceCacheStage->resources[i].stateChange));
} else {
LOGERROR("bad return from ncDescribeResource(%s) (%d)\n", resourceCacheStage->resources[i].hostname, rc);
resourceCacheStage->resources[i].maxMemory = 0;
resourceCacheStage->resources[i].availMemory = 0;
resourceCacheStage->resources[i].maxDisk = 0;
resourceCacheStage->resources[i].availDisk = 0;
resourceCacheStage->resources[i].maxCores = 0;
resourceCacheStage->resources[i].availCores = 0;
changeState(&(resourceCacheStage->resources[i]), RESDOWN);
resourceCacheStage->resources[i].ncState = NOTREADY;
resourceCacheStage->resources[i].migrationCapable = FALSE;
euca_strncpy(resourceCacheStage->resources[i].nodeMessage, SP(errMsg), 1024);
LOGERROR("error message from ncDescribeResource: %s\n", resourceCacheStage->resources[i].nodeMessage);
}
} else {
LOGDEBUG("received data from node=%s status=%s mem=%d/%d disk=%d/%d cores=%d/%d migrationCapable=%s\n",
resourceCacheStage->resources[i].hostname,
ncResDst->nodeStatus,
ncResDst->memorySizeAvailable, ncResDst->memorySizeMax,
ncResDst->diskSizeAvailable, ncResDst->diskSizeMax, ncResDst->numberOfCoresAvailable, ncResDst->numberOfCoresMax,
(ncResDst->migrationCapable == TRUE) ? "TRUE" : "FALSE");
resourceCacheStage->resources[i].maxMemory = ncResDst->memorySizeMax;
resourceCacheStage->resources[i].availMemory = ncResDst->memorySizeAvailable;
resourceCacheStage->resources[i].maxDisk = ncResDst->diskSizeMax;
resourceCacheStage->resources[i].availDisk = ncResDst->diskSizeAvailable;
resourceCacheStage->resources[i].maxCores = ncResDst->numberOfCoresMax;
resourceCacheStage->resources[i].availCores = ncResDst->numberOfCoresAvailable;
if (!strcmp(ncResDst->nodeStatus, "enabled")) {
resourceCacheStage->resources[i].ncState = ENABLED;
} else if (!strcmp(ncResDst->nodeStatus, "disabled")) {
resourceCacheStage->resources[i].ncState = STOPPED;
}
resourceCacheStage->resources[i].migrationCapable = ncResDst->migrationCapable;
euca_strncpy(resourceCacheStage->resources[i].nodeStatus, ncResDst->nodeStatus, 24);
//// // temporarily duplicate the NC reported value in the node message for debugging
strcpy(resourceCacheStage->resources[i].nodeMessage, "");
// set iqn, if set
if (strlen(ncResDst->iqn)) {
snprintf(resourceCacheStage->resources[i].iqn, 128, "%s", ncResDst->iqn);
}
if (strlen(ncResDst->hypervisor)) {
euca_strncpy(resourceCacheStage->resources[i].hypervisor, ncResDst->hypervisor, 16);
}
changeState(&(resourceCacheStage->resources[i]), RESUP);
}
if (errMsg != NULL) {
EUCA_FREE(errMsg);
}
} else {
LOGDEBUG("resource asleep/running instances (%d), skipping resource update\n", resourceCacheStage->resources[i].running);
}
// try to discover the mac address of the resource
if (resourceCacheStage->resources[i].mac[0] == '\0' && resourceCacheStage->resources[i].ip[0] != '\0') {
char *mac;
rc = IP2MAC(resourceCacheStage->resources[i].ip, &mac);
if (!rc) {
euca_strncpy(resourceCacheStage->resources[i].mac, mac, 24);
EUCA_FREE(mac);
LOGDEBUG("discovered MAC '%s' for host %s(%s)\n", resourceCacheStage->resources[i].mac,
resourceCacheStage->resources[i].hostname, resourceCacheStage->resources[i].ip);
}
}
EUCA_FREE(ncResDst);
sem_mypost(REFRESHLOCK);
exit(0);
} else {
pids[i] = pid;
}
}
for (i = 0; i < resourceCacheStage->numResources; i++) {
rc = timewait(pids[i], &status, 120);
if (!rc) {
// timed out, really bad failure (reset REFRESHLOCK semaphore)
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
rc = 1;
} else if (rc > 0) {
// process exited, and wait picked it up.
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
} else {
rc = 1;
}
} else {
// process no longer exists, and someone else reaped it
rc = 0;
}
if (rc) {
LOGWARN("error waiting for child pid '%d', exit code '%d'\n", pids[i], rc);
}
}
// resourceCacheStage[] entries were updated based on replies from NC,
// so merge them into the canonical location: resourceCache[] (no
// need to try removing hosts, since instanceCache membership
// does not change as part of the update)
refresh_resourceCache(resourceCacheStage, FALSE);
EUCA_FREE(pids);
LOGTRACE("done\n");
return (0);
}
//!
//! @param[in] myInstance instance to check for migration
//! @param[in] host reported hostname
//! @param[in] src source node for migration
//! @param[in] dst destination node for migration
//! @param[in] migration_state reported migration state
//! @param[out] node node to which to send migration action request
//! @param[out] instance
//! @param[out] action migration action to request of node
//!
//! @return EUCA_OK or EUCA
//!
//! @pre
//!
//! @note
//!
static int migration_handler(ccInstance * myInstance, char *host, char *src, char *dst, migration_states migration_state, char **node, char **instance, char **action)
{
int rc = 0;
LOGDEBUG("invoked\n");
if (!strcmp(host, dst)) {
if ((migration_state == MIGRATION_READY) && !strcmp(myInstance->state, "Extant")) {
LOGDEBUG("[%s] destination node %s reports %s(%s), checking source node %s\n", myInstance->instanceId, host, myInstance->state,
migration_state_names[myInstance->migration_state], src);
ccInstance *srcInstance = NULL;
rc = find_instanceCacheId(myInstance->instanceId, &srcInstance);
if (!rc) {
if (srcInstance->migration_state == MIGRATION_READY) {
LOGINFO("[%s] source node %s last reported %s(%s), destination node %s reports %s(%s), preparing to commit migration\n", myInstance->instanceId, src,
srcInstance->state, migration_state_names[srcInstance->migration_state], dst, myInstance->state, migration_state_names[myInstance->migration_state]);
EUCA_FREE(*node);
EUCA_FREE(*instance);
EUCA_FREE(*action);
*node = strdup(src);
*instance = strdup(myInstance->instanceId);
*action = strdup("commit");
} else if (srcInstance->migration_state == MIGRATION_IN_PROGRESS) {
LOGDEBUG("[%s] source node %s last reported migration to %s in progress\n", myInstance->instanceId, src, dst);
} else if (srcInstance->migration_state == NOT_MIGRATING) {
LOGINFO("[%s] source node %s last reported %s(%s), destination node %s reports %s(%s), preparing to roll back destination node\n",
myInstance->instanceId, src, srcInstance->state, migration_state_names[srcInstance->migration_state], dst, myInstance->state,
migration_state_names[myInstance->migration_state]);
EUCA_FREE(*node);
EUCA_FREE(*instance);
EUCA_FREE(*action);
*node = strdup(dst);
*instance = strdup(myInstance->instanceId);
*action = strdup("rollback");
} else {
LOGDEBUG("[%s] source node %s last reported %s(%s), has not yet reported ready to commit migration to %s\n",
myInstance->instanceId, src, srcInstance->state, migration_state_names[srcInstance->migration_state], dst);
}
} else {
LOGERROR("[%s] could not find migration source node %s in the instance cache\n", myInstance->instanceId, src);
}
EUCA_FREE(srcInstance);
} else if (((migration_state == MIGRATION_CLEANING) || (migration_state == MIGRATION_READY)) && !strcmp(myInstance->state, "Teardown")) {
LOGDEBUG("[%s] destination node %s reports %s(%s), checking source node %s\n", myInstance->instanceId, host, myInstance->state,
migration_state_names[myInstance->migration_state], src);
ccInstance *srcInstance = NULL;
rc = find_instanceCacheId(myInstance->instanceId, &srcInstance);
if (!rc) {
// TO-DO: State matrix says also to roll back if source
// is Extant(preparing), but that's causing some odd
// (though benign?) effects right now. So only rolling
// back if Extant(ready).
// Part of the fix for this should be to cache (locally)
// the scheduled destination node for the migration so
// we can pass it when requesting a rollback to a source
// that has not yet reported ready during a polling
// cycle.
if ((srcInstance->migration_state == MIGRATION_READY) && !strcmp(srcInstance->state, "Extant")) {
LOGINFO("[%s] source node %s last reported %s(%s), destination node %s reports %s(%s), preparing to roll back source node\n",
myInstance->instanceId, src, srcInstance->state, migration_state_names[srcInstance->migration_state], dst, myInstance->state,
migration_state_names[myInstance->migration_state]);
EUCA_FREE(*node);
EUCA_FREE(*instance);
EUCA_FREE(*action);
*node = strdup(src);
*instance = strdup(myInstance->instanceId);
*action = strdup("rollback");
} else {
LOGDEBUG("[%s] source node %s last reported %s(%s), destination node %s reports %s(%s), doing nothing\n",
myInstance->instanceId, src, srcInstance->state, migration_state_names[srcInstance->migration_state], dst, myInstance->state,
migration_state_names[myInstance->migration_state]);
}
} else {
LOGDEBUG("[%s] could not find migration source node %s in the instance cache\n", myInstance->instanceId, src);
}
EUCA_FREE(srcInstance);
} else {
LOGDEBUG("[%s] ignoring update from destination node %s during migration: %s(%s)\n", myInstance->instanceId, host, myInstance->state,
migration_state_names[myInstance->migration_state]);
}
} else if (!strcmp(host, src)) {
LOGDEBUG("[%s] received state %s(%s) from source node %s\n", myInstance->instanceId, myInstance->state, migration_state_names[migration_state], host);
} else {
LOGERROR("[%s] received status from a migrating node that's neither the source (%s) nor the destination (%s): %s\n", myInstance->instanceId, src, dst, host);
}
LOGDEBUG("done\n");
return (rc);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] timeout
//! @param[in] dolock
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int refresh_instances(ncMetadata * pMeta, int timeout, int dolock)
{
ccInstance *myInstance = NULL;
int i, numInsts = 0, found, ncOutInstsLen, rc, pid, nctimeout, *pids = NULL, status;
time_t op_start;
char *migration_host = NULL;
char *migration_instance = NULL;
char *migration_action = NULL;
ncInstance **ncOutInsts = NULL;
op_start = time(NULL);
LOGDEBUG("invoked: timeout=%d, dolock=%d\n", timeout, dolock);
set_clean_instanceCache();
// critical NC call section
sem_mywait(RESCACHE);
memcpy(resourceCacheStage, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
pids = EUCA_ZALLOC(resourceCacheStage->numResources, sizeof(int));
if (!pids) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
invalidate_instanceCache();
for (i = 0; i < resourceCacheStage->numResources; i++) {
sem_mywait(REFRESHLOCK);
pid = fork();
if (!pid) {
if (resourceCacheStage->resources[i].state == RESUP) {
int j;
nctimeout = ncGetTimeout(op_start, timeout, 1, 1);
rc = ncClientCall(pMeta, nctimeout, resourceCacheStage->resources[i].lockidx, resourceCacheStage->resources[i].ncURL,
"ncDescribeInstances", NULL, 0, &ncOutInsts, &ncOutInstsLen);
if (!rc) {
// if idle, power down
if (ncOutInstsLen == 0) {
LOGDEBUG("node %s idle since %ld: (%ld/%d) seconds\n", resourceCacheStage->resources[i].hostname,
resourceCacheStage->resources[i].idleStart, time(NULL) - resourceCacheStage->resources[i].idleStart, config->idleThresh);
if (!resourceCacheStage->resources[i].idleStart) {
resourceCacheStage->resources[i].idleStart = time(NULL);
} else if ((time(NULL) - resourceCacheStage->resources[i].idleStart) > config->idleThresh) {
// call powerdown
if (powerDown(pMeta, &(resourceCacheStage->resources[i]))) {
LOGWARN("powerDown for %s failed\n", resourceCacheStage->resources[i].hostname);
}
}
} else {
resourceCacheStage->resources[i].idleStart = 0;
}
// populate instanceCache
for (j = 0; j < ncOutInstsLen; j++) {
found = 1;
if (found) {
myInstance = NULL;
// add it
LOGDEBUG("describing instance %s, %s, %d\n", ncOutInsts[j]->instanceId, ncOutInsts[j]->stateName, j);
numInsts++;
// grab instance from cache, if available. otherwise, start from scratch
rc = find_instanceCacheId(ncOutInsts[j]->instanceId, &myInstance);
if (rc || !myInstance) {
myInstance = EUCA_ZALLOC(1, sizeof(ccInstance));
if (!myInstance) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
}
// update CC instance with instance state from NC
rc = ncInstance_to_ccInstance(myInstance, ncOutInsts[j]);
// migration-related logic
if (ncOutInsts[j]->migration_state != NOT_MIGRATING) {
rc = migration_handler(myInstance,
resourceCacheStage->resources[i].hostname,
ncOutInsts[j]->migration_src,
ncOutInsts[j]->migration_dst, ncOutInsts[j]->migration_state, &migration_host, &migration_instance, &migration_action);
// For now just ignore updates from destination while migrating.
if (!strcmp(resourceCacheStage->resources[i].hostname, ncOutInsts[j]->migration_dst)) {
LOGTRACE("[%s] ignoring update from destination node %s during migration (host=%s, instance=%s, action=%s)\n",
myInstance->instanceId, ncOutInsts[j]->migration_dst, SP(migration_host), SP(migration_instance), SP(migration_action));
EUCA_FREE(myInstance);
continue;
}
}
// instance info that the CC maintains
myInstance->ncHostIdx = i;
// Is this redundant?
myInstance->migration_state = ncOutInsts[j]->migration_state;
euca_strncpy(myInstance->serviceTag, resourceCacheStage->resources[i].ncURL, 384);
{
char *ip = NULL;
if (!strcmp(myInstance->ccnet.privateIp, "0.0.0.0")) {
if ((rc = MAC2IP(myInstance->ccnet.privateMac, &ip)) == 0) {
euca_strncpy(myInstance->ccnet.privateIp, ip, INET_ADDR_LEN);
}
}
EUCA_FREE(ip);
}
if ((myInstance->ccnet.publicIp[0] != '\0' && strcmp(myInstance->ccnet.publicIp, "0.0.0.0"))
&& (myInstance->ncnet.publicIp[0] == '\0' || !strcmp(myInstance->ncnet.publicIp, "0.0.0.0"))) {
// CC has network info, NC does not
LOGDEBUG("sending ncAssignAddress to sync NC\n");
rc = ncClientCall(pMeta, nctimeout, resourceCacheStage->resources[i].lockidx, resourceCacheStage->resources[i].ncURL,
"ncAssignAddress", myInstance->instanceId, myInstance->ccnet.publicIp);
if (rc) {
// problem, but will retry next time
LOGWARN("could not send AssignAddress to NC\n");
}
}
refresh_instanceCache(myInstance->instanceId, myInstance);
LOGDEBUG("storing instance state: %s/%s/%s/%s\n", myInstance->instanceId, myInstance->state, myInstance->ccnet.publicIp, myInstance->ccnet.privateIp);
print_ccInstance("refresh_instances(): ", myInstance);
sensor_set_resource_alias(myInstance->instanceId, myInstance->ncnet.privateIp);
EUCA_FREE(myInstance);
}
}
}
if (ncOutInsts) {
for (j = 0; j < ncOutInstsLen; j++) {
free_instance(&(ncOutInsts[j]));
}
EUCA_FREE(ncOutInsts);
}
}
sem_mypost(REFRESHLOCK);
if (migration_host) {
if (!strcmp(migration_action, "commit")) {
LOGDEBUG("[%s] notifying source %s to commit migration\n", migration_instance, migration_host);
// Note: Really only need to specify the instance here.
doMigrateInstances(pMeta, migration_host, migration_instance, NULL, 0, 0, "commit");
} else if (!strcmp(migration_action, "rollback")) {
LOGDEBUG("[%s] notifying node %s to roll back migration\n", migration_instance, migration_host);
doMigrateInstances(pMeta, migration_host, migration_instance, NULL, 0, 0, "rollback");
} else {
LOGWARN("unexpected migration action '%s' for node %s -- doing nothing\n", migration_action, migration_host);
}
EUCA_FREE(migration_host);
}
EUCA_FREE(migration_instance);
EUCA_FREE(migration_action);
exit(0);
} else {
pids[i] = pid;
}
}
for (i = 0; i < resourceCacheStage->numResources; i++) {
rc = timewait(pids[i], &status, 120);
if (!rc) {
// timed out, really bad failure (reset REFRESHLOCK semaphore)
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
rc = 1;
} else if (rc > 0) {
// process exited, and wait picked it up.
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
} else {
rc = 1;
}
} else {
// process no longer exists, and someone else reaped it
rc = 0;
}
if (rc) {
LOGWARN("error waiting for child pid '%d', exit code '%d'\n", pids[i], rc);
}
}
invalidate_instanceCache(); // purge old instances from cache
// update canonical array of resources with latest changes
// to resourceCacheStage (.idleStart may have changed) and
// remove any unconfigured hosts if they have no instances
refresh_resourceCache(resourceCacheStage, TRUE);
EUCA_FREE(pids);
LOGTRACE("done\n");
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] timeout
//! @param[in] dolock
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int refresh_sensors(ncMetadata * pMeta, int timeout, int dolock)
{
time_t op_start = time(NULL);
LOGDEBUG("invoked: timeout=%d, dolock=%d\n", timeout, dolock);
int history_size;
long long collection_interval_time_ms;
if ((sensor_get_config(&history_size, &collection_interval_time_ms) != 0) || history_size < 1 || collection_interval_time_ms == 0)
return (1); // sensor system not configured yet
// critical NC call section
sem_mywait(RESCACHE);
memcpy(resourceCacheStage, resourceCache, sizeof(ccResourceCache));
sem_mypost(RESCACHE);
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
int *pids = EUCA_ZALLOC(resourceCacheStage->numResources, sizeof(int));
if (!pids) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
for (int i = 0; i < resourceCacheStage->numResources; i++) {
sem_mywait(REFRESHLOCK);
pid_t pid = fork();
if (!pid) {
if (resourceCacheStage->resources[i].state == RESUP) {
int nctimeout = ncGetTimeout(op_start, timeout, 1, 1);
sensorResource **srs;
int srsLen;
int rc = ncClientCall(pMeta, nctimeout, resourceCacheStage->resources[i].lockidx, resourceCacheStage->resources[i].ncURL,
"ncDescribeSensors", history_size, collection_interval_time_ms,
NULL, 0, NULL, 0, &srs, &srsLen);
if (!rc) {
// update our cache
if (sensor_merge_records(srs, srsLen, TRUE) != EUCA_OK) {
LOGWARN("failed to store all sensor data due to lack of space");
}
if (srsLen > 0) {
for (int j = 0; j < srsLen; j++) {
EUCA_FREE(srs[j]);
}
EUCA_FREE(srs);
}
}
}
sem_mypost(REFRESHLOCK);
exit(0);
} else {
pids[i] = pid;
}
}
for (int i = 0; i < resourceCacheStage->numResources; i++) {
int status;
int rc = timewait(pids[i], &status, 120);
if (!rc) {
// timed out, really bad failure (reset REFRESHLOCK semaphore)
sem_close(locks[REFRESHLOCK]);
locks[REFRESHLOCK] = sem_open("/eucalyptusCCrefreshLock", O_CREAT, 0644, config->ncFanout);
rc = 1;
} else if (rc > 0) {
// process exited, and wait picked it up.
if (WIFEXITED(status)) {
rc = WEXITSTATUS(status);
} else {
rc = 1;
}
} else {
// process no longer exists, and someone else reaped it
rc = 0;
}
if (rc) {
LOGWARN("error waiting for child pid '%d', exit code '%d'\n", pids[i], rc);
}
}
EUCA_FREE(pids);
LOGTRACE("done\n");
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] instIds
//! @param[in] instIdsLen
//! @param[out] outInsts
//! @param[out] outInstsLen
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doDescribeInstances(ncMetadata * pMeta, char **instIds, int instIdsLen, ccInstance ** outInsts, int *outInstsLen)
{
int i, rc, count;
time_t op_start;
LOGDEBUG("invoked: userId=%s, instIdsLen=%d\n", SP(pMeta ? pMeta->userId : "UNSET"), instIdsLen);
op_start = time(NULL);
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
*outInsts = NULL;
*outInstsLen = 0;
sem_mywait(INSTCACHE);
count = 0;
if (instanceCache->numInsts) {
*outInsts = EUCA_ZALLOC(instanceCache->numInsts, sizeof(ccInstance));
if (!*outInsts) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
for (i = 0; i < MAXINSTANCES_PER_CC; i++) {
if (instanceCache->cacheState[i] == INSTVALID) {
if (count >= instanceCache->numInsts) {
LOGWARN("found more instances than reported by numInsts, will only report a subset of instances\n");
count = 0;
}
memcpy(&((*outInsts)[count]), &(instanceCache->instances[i]), sizeof(ccInstance));
// We only report a subset of possible migration statuses upstream to the CLC.
if ((*outInsts)[count].migration_state == MIGRATION_READY) {
(*outInsts)[count].migration_state = MIGRATION_PREPARING;
} else if ((*outInsts)[count].migration_state == MIGRATION_CLEANING) {
(*outInsts)[count].migration_state = MIGRATION_IN_PROGRESS;
}
count++;
}
}
*outInstsLen = instanceCache->numInsts;
}
sem_mypost(INSTCACHE);
for (i = 0; i < (*outInstsLen); i++) {
LOGDEBUG("instances summary: instanceId=%s, state=%s, migration_state=%s, publicIp=%s, privateIp=%s\n",
(*outInsts)[i].instanceId,
(*outInsts)[i].state, migration_state_names[(*outInsts)[i].migration_state], (*outInsts)[i].ccnet.publicIp, (*outInsts)[i].ccnet.privateIp);
}
LOGTRACE("done\n");
shawn();
return (0);
}
//!
//!
//!
//! @param[in] res
//!
//! @return
//!
//! @pre
//!
//! @note
//!
//! @todo Implement in EUCANETD
//!
int powerUp(ccResource * res)
{
return (0);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] node
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int powerDown(ncMetadata * pMeta, ccResource * node)
{
return (0);
}
//!
//!
//!
//! @param[in] prestr
//! @param[in] in
//!
//! @pre
//!
//! @note
//!
void print_netConfig(char *prestr, netConfig * in)
{
LOGDEBUG("%s: vlan:%d networkIndex:%d privateMac:%s publicIp:%s privateIp:%s\n", prestr, in->vlan, in->networkIndex, in->privateMac, in->publicIp, in->privateIp);
}
//!
//!
//!
//! @param[in] dst
//! @param[in] src
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int ncInstance_to_ccInstance(ccInstance * dst, ncInstance * src)
{
int i;
euca_strncpy(dst->uuid, src->uuid, 48);
euca_strncpy(dst->instanceId, src->instanceId, 16);
euca_strncpy(dst->reservationId, src->reservationId, 16);
euca_strncpy(dst->accountId, src->accountId, 48);
euca_strncpy(dst->ownerId, src->ownerId, 48);
euca_strncpy(dst->amiId, src->imageId, 16);
euca_strncpy(dst->kernelId, src->kernelId, 16);
euca_strncpy(dst->ramdiskId, src->ramdiskId, 16);
euca_strncpy(dst->keyName, src->keyName, 1024);
euca_strncpy(dst->launchIndex, src->launchIndex, 64);
euca_strncpy(dst->platform, src->platform, 64);
euca_strncpy(dst->guestStateName, src->guestStateName, 64);
euca_strncpy(dst->bundleTaskStateName, src->bundleTaskStateName, 64);
dst->bundleTaskProgress = src->bundleTaskProgress;
euca_strncpy(dst->createImageTaskStateName, src->createImageTaskStateName, 64);
euca_strncpy(dst->userData, src->userData, 16384);
euca_strncpy(dst->state, src->stateName, 16);
euca_strncpy(dst->migration_src, src->migration_src, HOSTNAME_LEN);
euca_strncpy(dst->migration_dst, src->migration_dst, HOSTNAME_LEN);
dst->ts = src->launchTime;
dst->migration_state = src->migration_state;
memcpy(&(dst->ncnet), &(src->ncnet), sizeof(netConfig));
for (i = 0; i < src->groupNamesSize && i < 64; i++) {
snprintf(dst->groupNames[i], 64, "%s", src->groupNames[i]);
}
memcpy(dst->volumes, src->volumes, sizeof(ncVolume) * EUCA_MAX_VOLUMES);
dst->volumesSize = 0;
for (i = 0; i < EUCA_MAX_VOLUMES; i++) {
if (strlen(dst->volumes[i].volumeId) == 0)
break;
dst->volumesSize++;
}
memcpy(&(dst->ccvm), &(src->params), sizeof(virtualMachine));
dst->blkbytes = src->blkbytes;
dst->netbytes = src->netbytes;
return (0);
}
//!
//!
//!
//! @param[in] dst
//! @param[in] src
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int ccInstance_to_ncInstance(ncInstance * dst, ccInstance * src)
{
int i;
euca_strncpy(dst->uuid, src->uuid, 48);
euca_strncpy(dst->instanceId, src->instanceId, 16);
euca_strncpy(dst->reservationId, src->reservationId, 16);
euca_strncpy(dst->accountId, src->accountId, 48);
euca_strncpy(dst->userId, src->ownerId, 48); //! @TODO: is this right?
euca_strncpy(dst->ownerId, src->ownerId, 48);
euca_strncpy(dst->imageId, src->amiId, 16);
euca_strncpy(dst->kernelId, src->kernelId, 16);
euca_strncpy(dst->ramdiskId, src->ramdiskId, 16);
euca_strncpy(dst->keyName, src->keyName, 1024);
euca_strncpy(dst->launchIndex, src->launchIndex, 64);
euca_strncpy(dst->platform, src->platform, 64);
euca_strncpy(dst->guestStateName, src->guestStateName, 64);
euca_strncpy(dst->bundleTaskStateName, src->bundleTaskStateName, 64);
euca_strncpy(dst->createImageTaskStateName, src->createImageTaskStateName, 64);
euca_strncpy(dst->userData, src->userData, 16384);
euca_strncpy(dst->stateName, src->state, 16);
euca_strncpy(dst->migration_src, src->migration_src, HOSTNAME_LEN);
euca_strncpy(dst->migration_dst, src->migration_dst, HOSTNAME_LEN);
dst->launchTime = src->ts;
dst->migration_state = src->migration_state;
memcpy(&(dst->ncnet), &(src->ncnet), sizeof(netConfig));
for (i = 0; i < 64; i++) {
snprintf(dst->groupNames[i], 64, "%s", src->groupNames[i]);
}
memcpy(dst->volumes, src->volumes, sizeof(ncVolume) * EUCA_MAX_VOLUMES);
for (i = 0; i < EUCA_MAX_VOLUMES; i++) {
if (strlen(dst->volumes[i].volumeId) == 0)
break;
}
memcpy(&(dst->params), &(src->ccvm), sizeof(virtualMachine));
dst->blkbytes = src->blkbytes;
dst->netbytes = src->netbytes;
return (0);
}
//!
//!
//!
//! @param[in] vm
//! @param[in] amiId
//! @param[in] kernelId
//! @param[in] ramdiskId
//! @param[in] instId
//! @param[in] userData
//! @param[in] platform
//! @param[in] targetNode
//! @param[out] outresid
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int schedule_instance(virtualMachine * vm, char *amiId, char *kernelId, char *ramdiskId, char *instId, char *userData, char *platform, char *targetNode, int *outresid)
{
int ret;
if (targetNode != NULL) {
ret = schedule_instance_explicit(vm, targetNode, outresid, FALSE);
} else if (config->schedPolicy == SCHEDGREEDY) {
ret = schedule_instance_greedy(vm, outresid);
} else if (config->schedPolicy == SCHEDROUNDROBIN) {
ret = schedule_instance_roundrobin(vm, outresid);
} else if (config->schedPolicy == SCHEDPOWERSAVE) {
ret = schedule_instance_greedy(vm, outresid);
} else if (config->schedPolicy == SCHEDUSER) {
ret = schedule_instance_user(vm, amiId, kernelId, ramdiskId, instId, userData, platform, outresid);
} else {
ret = schedule_instance_greedy(vm, outresid);
}
return (ret);
}
//!
//!
//!
//! @param[in] vm
//! @param[out] outresid
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int schedule_instance_roundrobin(virtualMachine * vm, int *outresid)
{
int i, done, start, found, resid = 0;
ccResource *res;
*outresid = 0;
LOGDEBUG("scheduler using ROUNDROBIN policy to find next resource\n");
// find the best 'resource' on which to run the instance
done = found = 0;
start = config->schedState;
i = start;
LOGDEBUG("scheduler state starting at resource %d\n", config->schedState);
while (!done) {
int mem, disk, cores;
res = &(resourceCache->resources[i]);
if ((res->state != RESDOWN) && (res->ncState == ENABLED)) {
mem = res->availMemory - vm->mem;
disk = res->availDisk - vm->disk;
cores = res->availCores - vm->cores;
if (mem >= 0 && disk >= 0 && cores >= 0) {
resid = i;
found = 1;
done++;
}
}
i++;
if (i >= resourceCache->numResources) {
i = 0;
}
if (i == start) {
done++;
}
}
if (!found) {
// didn't find a resource
return (1);
}
*outresid = resid;
config->schedState = i;
LOGDEBUG("scheduler state finishing at resource %d\n", config->schedState);
return (0);
}
//!
//! @param[in] instance instance to migrate
//! @param[in] includeNodes hosts to be included as possible migration destinations
//! @param[in] excludeNodes hosts to be excluded as migration destinations
//! @param[in] includeNodeCount number of host entries in destination-inclusion list
//! @param[in] excludeNodeCount number of host entries in destination-exclusion list
//! @param[in] inresid resource-cache index of migration source node
//! @param[out] outresid resource-cache index of scheduled migration destination node
//! @param[in] resourceCacheLocal local copy of global resource cache
//! @param[out] replyString
//!
//! @return
//!
//! @pre
//!
//! @note
//!
static int schedule_instance_migration(ncInstance * instance, char **includeNodes, char **excludeNodes, int includeNodeCount, int excludeNodeCount, int inresid, int *outresid,
ccResourceCache * resourceCacheLocal, char **replyString)
{
int ret = 0;
LOGDEBUG("invoked: include=%d, exclude=%d\n", includeNodeCount, excludeNodeCount);
if (includeNodes && excludeNodes) {
LOGERROR("[%s] migration scheduler cannot be called with both nodes to include and nodes to exclude; the options are mutually exclusive.\n", instance->instanceId);
*replyString = strdup("migration scheduler cannot be called with both nodes to include and nodes to exclude");
ret = 1;
goto out;
}
// Trivial case: migration to a specific node:
if (includeNodeCount == 1) {
LOGINFO("[%s] attempting to schedule migration to specific node: %s\n", instance->instanceId, includeNodes[0]);
if (!strcmp(instance->migration_src, includeNodes[0])) {
LOGERROR("[%s] cannot schedule SAME-NODE migration from %s to %s\n", instance->instanceId, instance->migration_src, includeNodes[0]);
*replyString = strdup("source and destination cannot be the same");
ret = 1;
goto out;
}
ret = schedule_instance_explicit(&(instance->params), includeNodes[0], outresid, TRUE);
if (resourceCacheLocal->resources[*outresid].migrationCapable == FALSE) {
LOGWARN("[%s] cannot schedule migration to node (%s) that is not migration capable\n", instance->instanceId, includeNodes[0]);
*replyString = strdup("requested destination is not migration capable");
ret = 1;
goto out;
}
} else {
if (config->schedPolicy == SCHEDROUNDROBIN) {
LOGDEBUG("[%s] scheduling migration using ROUNDROBIN scheduler\n", instance->instanceId);
} else if (config->schedPolicy == SCHEDGREEDY || config->schedPolicy == SCHEDPOWERSAVE) {
LOGINFO
("[%s] scheduling migration using ROUNDROBIN scheduler, despite GREEDY or POWERSAVE scheduler specification in Eucalyptus configuration file; GREEDY scheduling can be emulated by selecting specific destination nodes for migrations\n",
instance->instanceId);
} else {
LOGWARN("[%s] unsupported scheduler configuration--scheduling migration using ROUNDROBIN scheduler\n", instance->instanceId);
}
// This is relatively easy: we can keep calling the round-robin scheduler until we get a node we like.
int first_try = -1; // To break loops.
int done = 0;
int found = 0;
while (!done) {
ret = schedule_instance_roundrobin(&(instance->params), outresid);
if (first_try == -1) {
first_try = *outresid;
} else if (*outresid == first_try) {
LOGERROR("[%s] has looped around without scheduling a destination, breaking loop\n", instance->instanceId);
// We've already been here. We know this one won't work.
//done++;
break;
}
if (*outresid == inresid) {
// Tried to schduled to the source node, so retry.
LOGDEBUG("[%s] cannot schedule src_index=%d == dst_index=%d (%s > %s), trying again...\n",
instance->instanceId, inresid, *outresid, instance->migration_src, resourceCacheLocal->resources[*outresid].hostname);
} else if (resourceCacheLocal->resources[*outresid].migrationCapable == FALSE) {
LOGDEBUG("[%s] cannot schedule src_index=%d, dst_index=%d because node %s is not migration capable\n", instance->instanceId, inresid, *outresid,
resourceCacheLocal->resources[*outresid].hostname);
} else if (check_for_string_in_list(resourceCacheLocal->resources[*outresid].hostname, excludeNodes, excludeNodeCount)) {
// Exclusion list takes priority over inclusion list.
LOGDEBUG("[%s] cannot schedule src_index=%d, dst_index=%d because node %s is in destination-exclusion list\n",
instance->instanceId, inresid, *outresid, resourceCacheLocal->resources[*outresid].hostname);
} else if (includeNodeCount) {
if (!check_for_string_in_list(resourceCacheLocal->resources[*outresid].hostname, includeNodes, includeNodeCount)) {
LOGDEBUG("[%s] cannot schedule src_index=%d, dst_index=%d because node %s is not in destination-inclusion list\n",
instance->instanceId, inresid, *outresid, resourceCacheLocal->resources[*outresid].hostname);
} else {
LOGDEBUG("[%s] scheduled: src_index=%d, dst_index=%d (%s > %s) -- destination node is in inclusion list\n",
instance->instanceId, inresid, *outresid, resourceCacheLocal->resources[*outresid].hostname, instance->migration_src);
done++;
found++;
}
} else if (*outresid != inresid) {
// Found a destination node that's not the source node.
LOGDEBUG("[%s] scheduled: src_index=%d, dst_index=%d (%s > %s)\n", instance->instanceId, inresid, *outresid, instance->migration_src,
resourceCacheLocal->resources[*outresid].hostname);
done++;
found++;
}
}
if (!found) {
ret = 1;
}
}
out:
if (ret) {
LOGERROR("[%s] migration scheduler could not schedule destination node\n", instance->instanceId);
*outresid = -1;
}
LOGDEBUG("done\n");
return (ret);
}
//!
//!
//!
//! @param[in] vm
//! @param[in] targetNode
//! @param[out] outresid
//! @param[in] is_migration
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int schedule_instance_explicit(virtualMachine * vm, char *targetNode, int *outresid, boolean is_migration)
{
int i, done, resid, sleepresid;
ccResource *res;
*outresid = 0;
LOGDEBUG("scheduler using EXPLICIT policy to run VM on target node '%s'\n", targetNode);
// find the best 'resource' on which to run the instance
resid = sleepresid = -1;
done = 0;
for (i = 0; i < resourceCache->numResources && !done; i++) {
int mem, disk, cores;
res = &(resourceCache->resources[i]);
if (!strcmp(res->hostname, targetNode)) {
done++;
if ((res->state == RESUP) && ((res->ncState == ENABLED) || (is_migration && (res->ncState == STOPPED)))) {
if (is_migration && (res->ncState == STOPPED)) {
LOGINFO("scheduler overriding STOPPED state of target node (due to explicit scheduling request)\n");
}
mem = res->availMemory - vm->mem;
disk = res->availDisk - vm->disk;
cores = res->availCores - vm->cores;
if (mem >= 0 && disk >= 0 && cores >= 0) {
resid = i;
}
} else if ((res->state == RESASLEEP) && ((res->ncState == ENABLED) || (is_migration && (res->ncState == STOPPED)))) {
if (is_migration && (res->ncState == STOPPED)) {
LOGINFO("scheduler overriding STOPPED state of target node (due to explicit scheduling request)\n");
}
mem = res->availMemory - vm->mem;
disk = res->availDisk - vm->disk;
cores = res->availCores - vm->cores;
if (mem >= 0 && disk >= 0 && cores >= 0) {
sleepresid = i;
}
}
}
}
if (resid == -1 && sleepresid == -1) {
// target resource is unavailable
return (1);
}
if (resid != -1) {
res = &(resourceCache->resources[resid]);
*outresid = resid;
} else if (sleepresid != -1) {
res = &(resourceCache->resources[sleepresid]);
*outresid = sleepresid;
}
if (res->state == RESASLEEP) {
powerUp(res);
}
return (0);
}
//!
//!
//!
//! @param[in] vm
//! @param[out] outresid
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int schedule_instance_greedy(virtualMachine * vm, int *outresid)
{
int i, done, resid, sleepresid;
ccResource *res;
*outresid = 0;
if (config->schedPolicy == SCHEDGREEDY) {
LOGDEBUG("scheduler using GREEDY policy to find next resource\n");
} else if (config->schedPolicy == SCHEDPOWERSAVE) {
LOGDEBUG("scheduler using POWERSAVE policy to find next resource\n");
}
// find the best 'resource' on which to run the instance
resid = sleepresid = -1;
done = 0;
for (i = 0; i < resourceCache->numResources && !done; i++) {
int mem, disk, cores;
res = &(resourceCache->resources[i]);
if ((res->state == RESUP || res->state == RESWAKING) && (resid == -1) && (res->ncState == ENABLED)) {
mem = res->availMemory - vm->mem;
disk = res->availDisk - vm->disk;
cores = res->availCores - vm->cores;
if (mem >= 0 && disk >= 0 && cores >= 0) {
resid = i;
done++;
}
} else if ((res->state == RESASLEEP) && (sleepresid == -1) && (res->ncState == ENABLED)) {
mem = res->availMemory - vm->mem;
disk = res->availDisk - vm->disk;
cores = res->availCores - vm->cores;
if (mem >= 0 && disk >= 0 && cores >= 0) {
sleepresid = i;
}
}
}
if (resid == -1 && sleepresid == -1) {
// didn't find a resource
return (1);
}
if (resid != -1) {
res = &(resourceCache->resources[resid]);
*outresid = resid;
} else if (sleepresid != -1) {
res = &(resourceCache->resources[sleepresid]);
*outresid = sleepresid;
}
if (res->state == RESASLEEP) {
powerUp(res);
}
return (0);
}
//!
//!
//!
//! @param[in] vm
//! @param[out] outresid
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int schedule_instance_user(virtualMachine * vm, char *amiId, char *kernelId, char *ramdiskId, char *instId, char *userData, char *platform, int *outresid)
{
int i = 0;
int rc = 0;
int done = 0;
int resid = 0;
int sleepresid = 0;
ccResource *res = NULL;
ccInstance *inst = NULL;
FILE *OFH = NULL;
// create a temporary file for relaying resource information to the scheduler
char schedfile[EUCA_MAX_PATH] = "/tmp/euca-schedfile-XXXXXX";
if (str2file(NULL, schedfile, O_CREAT | O_EXCL | O_WRONLY, 0644, TRUE) != EUCA_OK)
return (-1);
// create a temporary file for relaying information about running instances to the scheduler
char instfile[EUCA_MAX_PATH] = "/tmp/euca-instfile-XXXXXX";
if (str2file(NULL, instfile, O_CREAT | O_EXCL | O_WRONLY, 0644, TRUE) != EUCA_OK)
return (-1);
// create a temporary file for relaying instance's user data to the scheduler
char datafile[EUCA_MAX_PATH] = "/tmp/euca-datafile-XXXXXX";
if (str2file(userData, datafile, O_CREAT | O_EXCL | O_WRONLY, 0644, TRUE) != EUCA_OK)
return (-1);
// clear out the result
*outresid = 0;
// populate the file with resource information
resid = sleepresid = -1;
done = 0;
OFH = fopen(schedfile, "w");
if (!OFH) {
LOGERROR("cannot open resources file '%s' for writing\n", schedfile);
return (-1);
}
char lbuf[512];
for (i = 0; i < resourceCache->numResources && !done; i++) {
res = &(resourceCache->resources[i]);
if (res) {
snprintf(lbuf, sizeof(lbuf), "idx=%d,ip=%s,state=%d,availmem=%d,availdisk=%d,availcores=%d", i + 1, res->ip, res->state, res->availMemory, res->availDisk,
res->availCores);
fprintf(OFH, "%s\n", lbuf);
}
}
fclose(OFH);
// populate the file with information about instances
OFH = fopen(instfile, "w");
if (!OFH) {
LOGERROR("cannot open temporary instance file '%s' for writing\n", instfile);
return (-1);
}
for (i = 0; i < instanceCache->numInsts; i++) {
inst = &(instanceCache->instances[i]);
if (inst) {
snprintf(lbuf, sizeof(lbuf), "id=%s,state=%s,nchost=%s,mem=%d,disk=%d,cores=%d,secgroupidx=%d,publicip=%s,privateip=%s,ownerId=%s,accountId=%s,launchTime=%ld",
inst->instanceId, inst->state, inst->serviceTag, inst->ccvm.mem, inst->ccvm.disk, inst->ccvm.cores, inst->ccnet.vlan, inst->ccnet.publicIp,
inst->ccnet.privateIp, inst->accountId, inst->ownerId, inst->ts);
fprintf(OFH, "%s\n", lbuf);
}
}
fclose(OFH);
// invoke the external scheduler, passing it the two files as well as resource requirements of the new instance
char cmd[EUCA_MAX_PATH * 3 + CHAR_BUFFER_SIZE]; // 3 paths on command line, plus other stuff
char stdout_str[VERY_BIG_CHAR_BUFFER_SIZE];
char stderr_str[VERY_BIG_CHAR_BUFFER_SIZE];
snprintf(cmd, sizeof(cmd), "%s %s %s %d %d %d %s %s %s", config->schedPath, schedfile, instfile, vm->mem, vm->disk, vm->cores, instId, datafile, platform);
rc = timeshell(cmd, stdout_str, stderr_str, VERY_BIG_CHAR_BUFFER_SIZE, SCHED_TIMEOUT_SEC);
LOGDEBUG("external scheduler returned: %d, stdout: '%s', stderr: '%s'\n", rc, stdout_str, stderr_str);
unlink(schedfile);
unlink(instfile);
unlink(datafile);
resid = rc - 1; // rc for valid nodes [1..N], 0 means scheduler could not find a resource
if (resid < 0 || resid >= resourceCache->numResources) {
// didn't find a resource
LOGWARN("couldn't find a resource or user scheduler is incorrect\n");
return (1);
}
*outresid = resid;
return (0);
}
//!
//!
//!
//! @param[in] gerund
//! @param[in] instIds
//! @param[in] instIdsLen
//!
static void print_abbreviated_instances(const char *gerund, char **instIds, int instIdsLen)
{
int k = 0;
int offset = 0;
char list[60] = "";
for (k = 0; ((k < instIdsLen) && (offset < ((sizeof(list) - 4)))); k++) {
offset += snprintf(list + offset, sizeof(list) - 3 - offset, "%s%s", (k == 0) ? ("") : (", "), instIds[k]);
}
if (strlen(list) == (sizeof(list) - 4)) {
sprintf(list + offset, "...");
}
LOGINFO("%s %d instance(s): %s\n", gerund, instIdsLen, list);
}
//!
//!
//!
//! @param[in] pMeta a pointer to the node controller (NC) metadata structure
//! @param[in] amiId
//! @param[in] kernelId the kernel image identifier (eki-XXXXXXXX)
//! @param[in] ramdiskId the ramdisk image identifier (eri-XXXXXXXX)
//! @param[in] amiURL
//! @param[in] kernelURL the kernel image URL address
//! @param[in] ramdiskURL the ramdisk image URL address
//! @param[in] instIds
//! @param[in] instIdsLen
//! @param[in] netNames
//! @param[in] netNamesLen
//! @param[in] macAddrs
//! @param[in] macAddrsLen
//! @param[in] networkIndexList
//! @param[in] networkIndexListLen
//! @param[in] uuids
//! @param[in] uuidsLen
//! @param[in] minCount
//! @param[in] maxCount
//! @param[in] accountId
//! @param[in] ownerId
//! @param[in] reservationId
//! @param[in] ccvm
//! @param[in] keyName
//! @param[in] vlan
//! @param[in] userData
//! @param[in] launchIndex
//! @param[in] platform
//! @param[in] expiryTime
//! @param[in] targetNode
//! @param[out] outInsts
//! @param[out] outInstsLen
//!
//! @return
//!
//! @pre
//!
//! @note
//!
int doRunInstances(ncMetadata * pMeta, char *amiId, char *kernelId, char *ramdiskId, char *amiURL, char *kernelURL, char *ramdiskURL, char **instIds,
int instIdsLen, char **netNames, int netNamesLen, char **netIds, int netIdsLen, char **macAddrs, int macAddrsLen, int *networkIndexList, int networkIndexListLen,
char **uuids, int uuidsLen, char **privateIps, int privateIpsLen, int minCount, int maxCount, char *accountId, char *ownerId,
char *reservationId, virtualMachine * ccvm, char *keyName, int vlan, char *userData, char *credential, char *launchIndex,
char *platform, int expiryTime, char *targetNode, char *rootDirective, ccInstance ** outInsts, int *outInstsLen)
{
int rc = 0, i = 0, done = 0, runCount = 0, resid = 0, foundnet = 0, error = 0, nidx = 0, thenidx = 0, pid = 0;
ccInstance *myInstance = NULL, *retInsts = NULL;
char instId[16], uuid[48];
ccResource *res = NULL;
char *mac = NULL;
char privip[32] = "";
char pubip[32] = "";
ncInstance *outInst = NULL;
virtualMachine ncvm;
netConfig ncnet;
rc = initialize(pMeta, FALSE);
if (rc || ccIsEnabled()) {
return (1);
}
print_abbreviated_instances("running", instIds, instIdsLen);
LOGINFO("running instances\n");
LOGDEBUG("invoked: userId=%s, emiId=%s, kernelId=%s, ramdiskId=%s, emiURL=%s, kernelURL=%s, ramdiskURL=%s, instIdsLen=%d, netNamesLen=%d, "
"macAddrsLen=%d, networkIndexListLen=%d, minCount=%d, maxCount=%d, accountId=%s, ownerId=%s, reservationId=%s, keyName=%s, vlan=%d, "
"userData=%s, credential=%s, launchIndex=%s, platform=%s, targetNode=%s, rootDirective=%s\n", SP(pMeta ? pMeta->userId : "UNSET"), SP(amiId), SP(kernelId),
SP(ramdiskId), SP(amiURL), SP(kernelURL), SP(ramdiskURL), instIdsLen, netNamesLen, macAddrsLen, networkIndexListLen, minCount, maxCount, SP(accountId), SP(ownerId),
SP(reservationId), SP(keyName), vlan, SP(userData), SP(credential), SP(launchIndex), SP(platform), SP(targetNode), SP(rootDirective));
if (config->use_proxy) {
char objectStorageURL[EUCA_MAX_PATH], *strptr = NULL, newURL[EUCA_MAX_PATH];
// get objectstorage IP
done = 0;
for (i = 0; i < 16 && !done; i++) {
if (!strcmp(config->services[i].type, "objectstorage")) {
snprintf(objectStorageURL, EUCA_MAX_PATH, "%s", config->services[i].uris[0]);
done++;
}
}
if (done) {
// cache and reset endpoint
for (i = 0; i < ccvm->virtualBootRecordLen; i++) {
newURL[0] = '\0';
if (!strcmp(ccvm->virtualBootRecord[i].typeName, "machine") || !strcmp(ccvm->virtualBootRecord[i].typeName, "kernel")
|| !strcmp(ccvm->virtualBootRecord[i].typeName, "ramdisk")) {
strptr = strstr(ccvm->virtualBootRecord[i].resourceLocation, "objectstorage://");
if (strptr) {
strptr += strlen("objectstorage://");
snprintf(newURL, EUCA_MAX_PATH, "%s/%s", objectStorageURL, strptr);
LOGDEBUG("constructed cacheable URL: %s\n", newURL);
rc = image_cache(ccvm->virtualBootRecord[i].id, newURL);
if (!rc) {
snprintf(ccvm->virtualBootRecord[i].resourceLocation, CHAR_BUFFER_SIZE, "http://%s:8776/%s", config->proxyIp, ccvm->virtualBootRecord[i].id);
} else {
LOGWARN("could not cache image %s/%s\n", ccvm->virtualBootRecord[i].id, newURL);
}
}
}
}
}
}
*outInstsLen = 0;
if (!ccvm) {
LOGERROR("invalid ccvm\n");
return (-1);
}
if (minCount <= 0 || maxCount <= 0 || instIdsLen < maxCount) {
LOGERROR("bad min or max count, or not enough instIds (%d, %d, %d)\n", minCount, maxCount, instIdsLen);
return (-1);
}
// check health of the networkIndexList
nidx = -1;
if (vlan > 0) {
vlan = 0;
}
retInsts = EUCA_ZALLOC(maxCount, sizeof(ccInstance));
if (!retInsts) {
LOGFATAL("out of memory!\n");
unlock_exit(1);
}
runCount = 0;
// get updated resource information
done = 0;
for (i = 0; i < maxCount && !done; i++) {
mac = EUCA_ZALLOC(32, sizeof(char));
snprintf(instId, 16, "%s", instIds[i]);
if (uuidsLen > i) {
snprintf(uuid, 48, "%s", uuids[i]);
} else {
snprintf(uuid, 48, "UNSET");
}
LOGDEBUG("running instance %s\n", instId);
foundnet = 0;
// generate new mac
bzero(pubip, 32);
bzero(privip, 32);
// pubip is handled outside of doRunInstances
strncpy(pubip, "0.0.0.0", 32);
// if private IP is passed in, set it here
if (privateIpsLen > 0 && privateIps[i] && strlen(privateIps[i])) {
snprintf(privip, 32, "%s", privateIps[i]);
} else {
strncpy(privip, "0.0.0.0", 32);
}
sem_mywait(NETCONFIG);
{
// either set up the mac from input, or make the calls to generate some combo of priv/pub/mac values from networking subsystem
if (!strcmp(gpEucaNet->sMode, NETMODE_VPCMIDO) && macAddrsLen > 0 && macAddrs[i] && strlen(macAddrs[i])) {
// new modes, no net generation, all vals come in as input
foundnet = 1;
thenidx = -1;
snprintf(mac, 32, "%s", macAddrs[i]);
LOGDEBUG("setting instance '%s' macAddr to CLC input value '%s'\n", instId, mac);
} else {
if ((rc = euca_inst2mac(gpEucaNet->sMacPrefix, instId, &mac)) == 0) {
foundnet = 1;
if (nidx == -1) {
thenidx = -1;
} else {
thenidx = nidx;
nidx++;
}
} else {
LOGDEBUG("Failed to compute MAC address for instance '%s' - MAC Prefix '%s'\n", instId, gpEucaNet->sMacPrefix);
foundnet = 0;
}
}
}
sem_mypost(NETCONFIG);
if (thenidx != -1) {
LOGDEBUG("assigning MAC/IP: %s/%s/%s/%d\n", mac, pubip, privip, networkIndexList[thenidx]);
} else {
LOGDEBUG("assigning MAC/IP: %s/%s/%s/%d\n", mac, pubip, privip, thenidx);
}
if (mac[0] == '\0' || !foundnet) {
LOGERROR("could not find/initialize any free network address, failing doRunInstances()\n");
} else {
// "run" the instance
memcpy(&ncvm, ccvm, sizeof(virtualMachine));
ncnet.vlan = vlan;
if (thenidx >= 0) {
ncnet.networkIndex = networkIndexList[thenidx];
} else {