Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
EC2 vm state callback performance - EUCA-10953
Browse files Browse the repository at this point in the history
  • Loading branch information
sjones4 committed Sep 1, 2015
1 parent 556baad commit 8957374
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,45 @@

package com.eucalyptus.cluster.callback;

import static com.eucalyptus.compute.common.internal.vm.VmInstance.VmState.PENDING;
import static com.eucalyptus.compute.common.internal.vm.VmInstance.VmState.RUNNING;
import static com.eucalyptus.compute.common.internal.vm.VmInstance.VmState.SHUTTING_DOWN;
import static com.eucalyptus.compute.common.internal.vm.VmInstance.VmState.STOPPING;
import static com.eucalyptus.compute.common.internal.vm.VmInstances.TerminatedInstanceException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.EntityTransaction;

import com.eucalyptus.component.id.ClusterController;
import com.eucalyptus.entities.TransactionResource;
import com.eucalyptus.event.ClockTick;
import com.eucalyptus.event.EventListener;
import com.eucalyptus.event.Listeners;
import com.eucalyptus.system.Threads;
import com.eucalyptus.util.Either;
import com.eucalyptus.util.NonNullFunction;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.log4j.Logger;
import org.hibernate.Criteria;
import com.eucalyptus.bootstrap.Databases;
import com.eucalyptus.compute.common.CloudMetadatas;
import com.eucalyptus.cluster.Cluster;
import com.eucalyptus.compute.common.network.InstanceResourceReportType;
import com.eucalyptus.compute.common.network.Networking;
import com.eucalyptus.compute.common.network.UpdateInstanceResourcesType;
import com.eucalyptus.entities.Entities;
import com.eucalyptus.entities.TransactionException;
import com.eucalyptus.entities.TransactionResource;
import com.eucalyptus.records.Logs;
import com.eucalyptus.util.TypeMapper;
import com.eucalyptus.util.TypeMappers;
import com.eucalyptus.util.async.FailedRequestException;
import com.eucalyptus.util.async.SubjectMessageCallback;
import com.eucalyptus.compute.common.internal.vm.VmBundleTask.BundleState;
import com.eucalyptus.compute.common.internal.vm.VmInstance;
import com.eucalyptus.compute.common.internal.vm.VmInstance.VmState;
Expand All @@ -96,10 +113,8 @@
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
import edu.ucsb.eucalyptus.cloud.VmDescribeResponseType;
import edu.ucsb.eucalyptus.cloud.VmDescribeType;
Expand All @@ -108,38 +123,41 @@

public class VmStateCallback extends StateUpdateMessageCallback<Cluster, VmDescribeType, VmDescribeResponseType> {
private static Logger LOG = Logger.getLogger( VmStateCallback.class );

private static ConcurrentMap<String, Long> pendingUpdates = Maps.newConcurrentMap( );

private final Supplier<Set<String>> initialInstances;

public VmStateCallback( ) {
super( new VmDescribeType( ) {
{
regarding( );
}
} );
this.initialInstances = createInstanceSupplier( this, Predicates.and( VmState.STOPPED.not( ), partitionFilter( this ) ) );
super( new VmDescribeType( ).<VmDescribeType>regarding( ) );
this.initialInstances = createInstanceSupplier( this, PENDING, RUNNING, SHUTTING_DOWN, STOPPING );
}

private static Supplier<Set<String>> createInstanceSupplier( final StateUpdateMessageCallback<Cluster, ?, ?> cb, final Predicate<VmInstance> filter ) {
private static Supplier<Set<String>> createInstanceSupplier(
final StateUpdateMessageCallback<Cluster, ?, ?> cb,
final VmState... states
) {
return Suppliers.memoize( new Supplier<Set<String>>( ) {

@Override
public Set<String> get( ) {
try ( final TransactionResource tx = Entities.readOnlyDistinctTransactionFor( VmInstance.class ) ) {
Collection<VmInstance> clusterInstances = VmInstances.list( null, filter );
Collection<String> instanceNames = Collections2.transform( clusterInstances, CloudMetadatas.toDisplayName( ) );
return Sets.newHashSet( instanceNames );
try ( final TransactionResource db = Entities.readOnlyDistinctTransactionFor( VmInstance.class ) ) {
final Criteria query = Entities.createCriteria( VmInstance.class )
.setReadOnly( true )
.setFetchSize( 50_000 )
.add( VmInstance.criterion( states ) )
.add( VmInstance.nonNullNodeCriterion( ) )
.add( VmInstance.zoneCriterion( cb.getSubject( ).getConfiguration( ).getPartition( ) ) )
.setProjection( VmInstance.instanceIdProjection( ) );
//noinspection unchecked
return Sets.newHashSet( (List<String>) query.list( ) );
} catch ( Exception ex ) {
Logs.extreme( ).error( ex, ex );
return Sets.newHashSet( );
}
}
} );
}

/**
* @see com.eucalyptus.cluster.callback.StateUpdateMessageCallback#fireException(com.eucalyptus.util.async.FailedRequestException)
* @param t
*/

@Override
public void fireException( FailedRequestException t ) {
LOG.debug( "Request to " + this.getSubject( ).getName( ) + " failed: " + t.getMessage( ) );
Expand All @@ -154,41 +172,53 @@ public void fire( VmDescribeResponseType reply ) {

if ( Databases.isVolatile( ) ) {
return;
} else {
reply.setOriginCluster( this.getSubject( ).getConfiguration( ).getName( ) );
final Set<String> reportedInstances = Sets.newHashSet( );
for ( VmInfo vmInfo : reply.getVms( ) ) {
reportedInstances.add( vmInfo.getInstanceId( ) );
vmInfo.setPlacement( this.getSubject( ).getConfiguration( ).getName( ) );
VmTypeInfo typeInfo = vmInfo.getInstanceType( );
if ( typeInfo.getName( ) == null || "".equals( typeInfo.getName( ) ) ) {
for ( VmType t : VmTypes.list( ) ) {
if ( t.getCpu( ).equals( typeInfo.getCores( ) ) && t.getDisk( ).equals( typeInfo.getDisk( ) ) && t.getMemory( ).equals( typeInfo.getMemory( ) ) ) {
typeInfo.setName( t.getName( ) );
}
}

reply.setOriginCluster( this.getSubject( ).getConfiguration( ).getName( ) );
final Set<String> reportedInstances = Sets.newHashSet( );
for ( VmInfo vmInfo : reply.getVms( ) ) {
reportedInstances.add( vmInfo.getInstanceId( ) );
vmInfo.setPlacement( this.getSubject( ).getConfiguration( ).getName( ) );
VmTypeInfo typeInfo = vmInfo.getInstanceType( );
if ( typeInfo.getName( ) == null || "".equals( typeInfo.getName( ) ) ) {
for ( VmType t : VmTypes.list( ) ) {
if ( t.getCpu( ).equals( typeInfo.getCores( ) ) && t.getDisk( ).equals( typeInfo.getDisk( ) ) &&
t.getMemory( ).equals( typeInfo.getMemory( ) ) ) {
typeInfo.setName( t.getName( ) );
}
}
}
final Set<String> unreportedInstances = Sets.newHashSet( Sets.difference( this.initialInstances.get( ), reportedInstances ) );
final Set<String> unknownInstances = Sets.newHashSet( Sets.difference( reportedInstances, this.initialInstances.get( ) ) );
for ( final VmInfo runVm : reply.getVms( ) ) {
if ( Databases.isVolatile( ) ) {
return;
} else if ( this.initialInstances.get( ).contains( runVm.getInstanceId( ) ) ) {
VmStateCallback.handleReportedState( runVm );
} else if ( unknownInstances.contains( runVm.getInstanceId( ) ) ) {
VmStateCallback.handleUnknown( runVm );
}
}
for ( final String vmId : unreportedInstances ) {
if ( Databases.isVolatile( ) ) {
return;
} else {
VmStateCallback.handleUnreported( vmId );
}
}

final Set<String> unreportedInstances =
Sets.newHashSet( Sets.difference( this.initialInstances.get( ), reportedInstances ) );
if ( Databases.isVolatile( ) ) {
return;
}

final Set<String> unknownInstances =
Sets.newHashSet( Sets.difference( reportedInstances, this.initialInstances.get( ) ) );

final List<Optional<Runnable>> taskList = Lists.newArrayList( );

for ( final VmInfo runVm : reply.getVms( ) ) {
if ( this.initialInstances.get( ).contains( runVm.getInstanceId( ) ) ) {
taskList.add( UpdateTaskFunction.REPORTED.apply( Either.<String, VmInfo>right( runVm ) ) );
} else if ( unknownInstances.contains( runVm.getInstanceId( ) ) ) {
taskList.add( UpdateTaskFunction.UNKNOWN.apply( Either.<String, VmInfo>right( runVm ) ) );
}
}
for ( final String vmId : unreportedInstances ) {
taskList.add( UpdateTaskFunction.UNREPORTED.apply( Either.<String,VmInfo>left( vmId ) ) );
}
for ( final Runnable task : Optional.presentInstances( taskList ) ) {
Threads.enqueue(
ClusterController.class,
VmStateCallback.class,
( Runtime.getRuntime( ).availableProcessors( ) * 2 ) + 1,
Executors.callable( task )
);
}
}

private static void handleUnreported( final String vmId ) {
Expand Down Expand Up @@ -225,8 +255,7 @@ private static void handleUnreported( final String vmId ) {
private static void handleReportedState( final VmInfo runVm ) {
final VmState runVmState = VmState.Mapper.get( runVm.getStateName( ) );
try {
final EntityTransaction db = Entities.get( VmInstance.class );
try {
try ( final TransactionResource db = Entities.transactionFor( VmInstance.class ) ) {
VmInstance vm = VmInstances.lookupAny( runVm.getInstanceId() );
if ( VmStateSet.DONE.apply( vm ) ) {
db.rollback( );
Expand Down Expand Up @@ -261,8 +290,6 @@ private static void handleReportedState( final VmInfo runVm ) {
LOG.error( ex );
Logs.extreme( ).error( ex, ex );
throw ex;
} finally {
if ( db.isActive() ) db.rollback();
}
} catch ( TerminatedInstanceException ex1 ) {
LOG.trace( "Ignore state update to terminated instance: " + runVm.getInstanceId( ) );
Expand All @@ -274,6 +301,66 @@ private static void handleReportedState( final VmInfo runVm ) {
}
}

enum UpdateTaskFunction implements NonNullFunction<Either<String,VmInfo>, Optional<Runnable>> {
REPORTED {
void task( final Either<String,VmInfo> idOrVmInfo ) {
VmStateCallback.handleReportedState( idOrVmInfo.getRight( ) );
}
},
UNKNOWN {
@Override
void task( final Either<String,VmInfo> idOrVmInfo ) {
VmStateCallback.handleUnknown( idOrVmInfo.getRight( ) );
}
},
UNREPORTED {
@Override
void task( final Either<String,VmInfo> idOrVmInfo ) {
VmStateCallback.handleUnreported( idOrVmInfo.getLeft( ) );
}
};

abstract void task( Either<String,VmInfo> idOrVmInfo );

@Nonnull
@Override
public Optional<Runnable> apply( final Either<String,VmInfo> input ) {
final String instanceId = input == null ?
null :
input.isLeft( ) ? input.getLeft( ) : input.getRight( ).getInstanceId( );
try {
final Runnable run = new Runnable( ) {
@Override
public void run() {
try {
UpdateTaskFunction.this.task( input );
} catch ( Exception e ) {
LOG.error(
"Failed to handle "
+ UpdateTaskFunction.this.name().toLowerCase()
+ " instance: "
+ instanceId
+ " because of "
+ e.getMessage()
);
} finally {
pendingUpdates.remove( instanceId );
}
}
};
if ( input != null
&& instanceId != null
&& pendingUpdates.putIfAbsent( instanceId, System.currentTimeMillis( ) ) == null ) {
return Optional.of( run );
} else {
return Optional.absent( );
}
} catch ( Exception e ) {
return Optional.absent( );
}
}
}

private static void handleUnknown( final VmInfo runVm ) {
for ( final Optional<VmInstances.RestoreHandler> restoreHandler :
VmInstances.RestoreHandler.parseList( VmInstances.UNKNOWN_INSTANCE_HANDLERS ) ) {
Expand Down Expand Up @@ -335,33 +422,14 @@ private static void handleReportedTeardown( VmInstance vm, final VmInfo runVm )
}
}

private static Predicate<VmInstance> stateSettleFilter( ) {
return new Predicate<VmInstance>( ) {

@Override
public boolean apply( VmInstance input ) {
return input.getCreationSplitTime( ) > ( VmInstances.VM_STATE_SETTLE_TIME * 1000 );
}
};
}

private static Predicate<VmInstance> partitionFilter( final SubjectMessageCallback<Cluster, ?, ?> cb ) {
return new Predicate<VmInstance>( ) {
@Override
public boolean apply( VmInstance arg0 ) {
return arg0.getPartition( ).equals( cb.getSubject( ).getConfiguration( ).getPartition( ) );
}
};
}

public static class VmPendingCallback extends StateUpdateMessageCallback<Cluster, VmDescribeType, VmDescribeResponseType> {
@SuppressWarnings( "unchecked" )
private final Predicate<VmInstance> filter = Predicates.and( VmStateSet.TORNDOWN.not( ), stateSettleFilter( ), partitionFilter( this ) );
public static class VmPendingCallback extends
StateUpdateMessageCallback<Cluster, VmDescribeType, VmDescribeResponseType> {

private final Supplier<Set<String>> initialInstances;

public VmPendingCallback( Cluster cluster ) {
super( cluster );
this.initialInstances = createInstanceSupplier( this, this.filter );
this.initialInstances = createInstanceSupplier( this, PENDING, STOPPING, SHUTTING_DOWN );
this.setRequest( new VmDescribeType( ) {
{
regarding( );
Expand All @@ -372,7 +440,7 @@ public VmPendingCallback( Cluster cluster ) {
throw new CancellationException( );
}
}

@Override
public void fire( VmDescribeResponseType reply ) {
for ( final VmInfo runVm : reply.getVms( ) ) {
Expand All @@ -383,16 +451,11 @@ public void fire( VmDescribeResponseType reply ) {
}
}
}

/**
* @see com.eucalyptus.cluster.callback.StateUpdateMessageCallback#fireException(com.eucalyptus.util.async.FailedRequestException)
* @param t
*/

@Override
public void fireException( FailedRequestException t ) {
LOG.debug( "Request to " + this.getSubject( ).getName( ) + " failed: " + t.getMessage( ) );
}

}

@Override
Expand Down Expand Up @@ -420,4 +483,22 @@ public InstanceResourceReportType apply( final VmDescribeResponseType response )
return report;
}
}

private static final class StateTaskExpiryEventListener implements EventListener<ClockTick> {
public static void register( ){
Listeners.register( ClockTick.class, new StateTaskExpiryEventListener( ) );
}

@Override
public void fireEvent( final ClockTick event ) {
final long expiry = System.currentTimeMillis( ) - TimeUnit.MINUTES.toMillis( 5 );
for ( final Map.Entry<String,Long> entry : pendingUpdates.entrySet( ) ) {
if ( entry.getValue( ) < expiry ) {
if ( pendingUpdates.remove( entry.getKey( ), entry.getValue( ) ) ) {
LOG.warn( "Expired state update task for instance " + entry.getKey( ) );
}
}
}
}
}
}
Loading

0 comments on commit 8957374

Please sign in to comment.