diff --git a/src/Orleans/Configuration/GlobalConfiguration.cs b/src/Orleans/Configuration/GlobalConfiguration.cs index 365f4f7905..d576e011c4 100644 --- a/src/Orleans/Configuration/GlobalConfiguration.cs +++ b/src/Orleans/Configuration/GlobalConfiguration.cs @@ -170,6 +170,10 @@ public bool PrimaryNodeIsRequired /// The number of seconds to attempt to join a cluster of silos before giving up. /// public TimeSpan MaxJoinAttemptTime { get; set; } + /// + /// The number of seconds to refresh the cluster grain interface map + /// + public TimeSpan TypeMapRefreshInterval { get; set; } internal ConfigValue ExpectedClusterSizeConfigValue { get; set; } /// /// The expected size of a cluster. Need not be very accurate, can be an overestimate. @@ -470,6 +474,7 @@ internal bool UseAzureSystemStore private static readonly TimeSpan DEFAULT_LIVENESS_DEATH_VOTE_EXPIRATION_TIMEOUT = TimeSpan.FromSeconds(120); private static readonly TimeSpan DEFAULT_LIVENESS_I_AM_ALIVE_TABLE_PUBLISH_TIMEOUT = TimeSpan.FromMinutes(5); private static readonly TimeSpan DEFAULT_LIVENESS_MAX_JOIN_ATTEMPT_TIME = TimeSpan.FromMinutes(5); // 5 min + private static readonly TimeSpan DEFAULT_REFRESH_CLUSTER_INTERFACEMAP_TIME = TimeSpan.FromMinutes(1); private const int DEFAULT_LIVENESS_NUM_MISSED_PROBES_LIMIT = 3; private const int DEFAULT_LIVENESS_NUM_PROBED_SILOS = 3; private const int DEFAULT_LIVENESS_NUM_VOTES_FOR_DEATH_DECLARATION = 2; @@ -522,6 +527,7 @@ internal GlobalConfiguration() UseLivenessGossip = DEFAULT_LIVENESS_USE_LIVENESS_GOSSIP; ValidateInitialConnectivity = DEFAULT_VALIDATE_INITIAL_CONNECTIVITY; MaxJoinAttemptTime = DEFAULT_LIVENESS_MAX_JOIN_ATTEMPT_TIME; + TypeMapRefreshInterval = DEFAULT_REFRESH_CLUSTER_INTERFACEMAP_TIME; MaxMultiClusterGateways = DEFAULT_MAX_MULTICLUSTER_GATEWAYS; BackgroundGossipInterval = DEFAULT_BACKGROUND_GOSSIP_INTERVAL; UseGlobalSingleInstanceByDefault = DEFAULT_USE_GLOBAL_SINGLE_INSTANCE; diff --git a/src/Orleans/Configuration/NodeConfiguration.cs b/src/Orleans/Configuration/NodeConfiguration.cs index d3ae789dfb..66a29a8e52 100644 --- a/src/Orleans/Configuration/NodeConfiguration.cs +++ b/src/Orleans/Configuration/NodeConfiguration.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; @@ -212,6 +213,8 @@ public string TraceFilePattern public Dictionary AdditionalAssemblyDirectories { get; set; } + public List ExcludedGrainTypes { get; set; } + public string SiloShutdownEventName { get; set; } internal const string DEFAULT_NODE_NAME = "default"; @@ -270,6 +273,7 @@ public NodeConfiguration() UseNagleAlgorithm = false; AdditionalAssemblyDirectories = new Dictionary(); + ExcludedGrainTypes = new List(); } public NodeConfiguration(NodeConfiguration other) @@ -320,6 +324,7 @@ public NodeConfiguration(NodeConfiguration other) StartupTypeName = other.StartupTypeName; AdditionalAssemblyDirectories = other.AdditionalAssemblyDirectories; + ExcludedGrainTypes = other.ExcludedGrainTypes.ToList(); } public override string ToString() @@ -487,7 +492,6 @@ internal void Load(XmlElement root) case "AdditionalAssemblyDirectories": ConfigUtilities.ParseAdditionalAssemblyDirectories(AdditionalAssemblyDirectories, child); break; - } } } diff --git a/src/Orleans/Logging/ErrorCodes.cs b/src/Orleans/Logging/ErrorCodes.cs index 9a6ec47def..2d923ed0a8 100644 --- a/src/Orleans/Logging/ErrorCodes.cs +++ b/src/Orleans/Logging/ErrorCodes.cs @@ -1093,6 +1093,8 @@ internal enum ErrorCode GlobalSingleInstance_MaintainerException = GlobalSingleInstanceBase + 3, GlobalSingleInstance_MultipleOwners = GlobalSingleInstanceBase + 4, + TypeManagerBase = Runtime + 4200, + TypeManager_GetSiloGrainInterfaceMapError = TypeManagerBase + 1, } } // ReSharper restore InconsistentNaming diff --git a/src/Orleans/Messaging/ProxiedMessageCenter.cs b/src/Orleans/Messaging/ProxiedMessageCenter.cs index 842f446d0d..39d04c7079 100644 --- a/src/Orleans/Messaging/ProxiedMessageCenter.cs +++ b/src/Orleans/Messaging/ProxiedMessageCenter.cs @@ -278,7 +278,7 @@ private void RejectOrResend(Message msg) public Task GetTypeCodeMap(GrainFactory grainFactory) { var silo = GetLiveGatewaySiloAddress(); - return GetTypeManager(silo, grainFactory).GetTypeCodeMap(silo); + return GetTypeManager(silo, grainFactory).GetClusterTypeCodeMap(); } public Task GetImplicitStreamSubscriberTable(GrainFactory grainFactory) @@ -397,9 +397,9 @@ public int ReceiveQueueLength #endregion - private ITypeManager GetTypeManager(SiloAddress destination, GrainFactory grainFactory) + private IClusterTypeManager GetTypeManager(SiloAddress destination, GrainFactory grainFactory) { - return grainFactory.GetSystemTarget(Constants.TypeManagerId, destination); + return grainFactory.GetSystemTarget(Constants.TypeManagerId, destination); } private SiloAddress GetLiveGatewaySiloAddress() diff --git a/src/Orleans/Runtime/GrainInterfaceMap.cs b/src/Orleans/Runtime/GrainInterfaceMap.cs index 2395780411..bf23cec350 100644 --- a/src/Orleans/Runtime/GrainInterfaceMap.cs +++ b/src/Orleans/Runtime/GrainInterfaceMap.cs @@ -67,7 +67,6 @@ public override string ToString() private readonly Dictionary table; private readonly HashSet unordered; - [NonSerialized] private readonly Dictionary implementationIndex; [NonSerialized] // Client shouldn't need this @@ -75,22 +74,59 @@ public override string ToString() private readonly bool localTestMode; private readonly HashSet loadedGrainAsemblies; + + private readonly PlacementStrategy defaultPlacementStrategy; - private readonly PlacementStrategy defaultPlacementStrategy; + internal IList SupportedGrainTypes + { + get { return implementationIndex.Keys.ToList(); } + } public GrainInterfaceMap(bool localTestMode, PlacementStrategy defaultPlacementStrategy) { - this.defaultPlacementStrategy = defaultPlacementStrategy; table = new Dictionary(); typeToInterfaceData = new Dictionary(); primaryImplementations = new Dictionary(); implementationIndex = new Dictionary(); unordered = new HashSet(); this.localTestMode = localTestMode; + this.defaultPlacementStrategy = defaultPlacementStrategy; if(localTestMode) // if we are running in test mode, we'll build a list of loaded grain assemblies to help with troubleshooting deployment issue loadedGrainAsemblies = new HashSet(); } + internal void AddMap(GrainInterfaceMap map) + { + foreach (var kvp in map.typeToInterfaceData) + { + if (!typeToInterfaceData.ContainsKey(kvp.Key)) + { + typeToInterfaceData.Add(kvp.Key, kvp.Value); + } + } + + foreach (var kvp in map.table) + { + if (!table.ContainsKey(kvp.Key)) + { + table.Add(kvp.Key, kvp.Value); + } + } + + foreach (var grainClassTypeCode in map.unordered) + { + unordered.Add(grainClassTypeCode); + } + + foreach (var kvp in map.implementationIndex) + { + if (!implementationIndex.ContainsKey(kvp.Key)) + { + implementationIndex.Add(kvp.Key, kvp.Value); + } + } + } + internal void AddEntry(int interfaceId, Type iface, int grainTypeCode, string grainInterface, string grainClass, string assembly, bool isGenericGrainClass, PlacementStrategy placement, MultiClusterRegistrationStrategy registrationStrategy, bool primaryImplementation = false) { @@ -173,6 +209,14 @@ internal bool ContainsGrainInterface(int interfaceId) } } + internal bool ContainsGrainImplementation(int typeCode) + { + lock (this) + { + return implementationIndex.ContainsKey(typeCode); + } + } + internal bool TryGetTypeInfo(int typeCode, out string grainClass, out PlacementStrategy placement, out MultiClusterRegistrationStrategy registrationStrategy, string genericArguments = null) { lock (this) diff --git a/src/Orleans/Runtime/ITypeManager.cs b/src/Orleans/Runtime/ITypeManager.cs index c60fed9982..ce126b9e69 100644 --- a/src/Orleans/Runtime/ITypeManager.cs +++ b/src/Orleans/Runtime/ITypeManager.cs @@ -6,10 +6,23 @@ namespace Orleans.Runtime /// /// Client gateway interface for obtaining the grain interface/type map. /// - internal interface ITypeManager : ISystemTarget + internal interface IClusterTypeManager : ISystemTarget { - Task GetTypeCodeMap(SiloAddress silo); + /// + /// Acquires grain interface map for all grain types supported across the entire cluster + /// + /// + Task GetClusterTypeCodeMap(); Task GetImplicitStreamSubscriberTable(SiloAddress silo); } + + internal interface ISiloTypeManager : ISystemTarget + { + /// + /// Acquires grain interface map for all grain types supported by hosted silo. + /// + /// + Task GetSiloTypeCodeMap(); + } } diff --git a/src/OrleansRuntime/Catalog/Catalog.cs b/src/OrleansRuntime/Catalog/Catalog.cs index 3157aa60ef..f5a617af8d 100644 --- a/src/OrleansRuntime/Catalog/Catalog.cs +++ b/src/OrleansRuntime/Catalog/Catalog.cs @@ -119,6 +119,7 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont public GrainTypeManager GrainTypeManager { get; private set; } + public SiloAddress LocalSilo { get; private set; } internal ISiloStatusOracle SiloStatusOracle { get; set; } internal readonly ActivationCollector ActivationCollector; @@ -200,6 +201,15 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont /// public Dispatcher Dispatcher { get; } + public IList GetCompatibleSiloList(GrainId grain) + { + var typeCode = grain.GetTypeCode(); + var compatibleSilos = GrainTypeManager.GetSupportedSilos(typeCode).Intersect(AllActiveSilos).ToList(); + if (compatibleSilos.Count == 0) + throw new OrleansException($"TypeCode ${typeCode} not supported in the cluster"); + return compatibleSilos; + } + internal void SetStorageManager(IStorageProviderManager storageManager) { storageProviderManager = storageManager; diff --git a/src/OrleansRuntime/GrainTypeManager/GrainTypeManager.cs b/src/OrleansRuntime/GrainTypeManager/GrainTypeManager.cs index 466693920a..df00e45bc8 100644 --- a/src/OrleansRuntime/GrainTypeManager/GrainTypeManager.cs +++ b/src/OrleansRuntime/GrainTypeManager/GrainTypeManager.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Net; using System.Reflection; @@ -13,18 +15,26 @@ namespace Orleans.Runtime internal class GrainTypeManager { private IDictionary grainTypes; + private Dictionary grainInterfaceMapsBySilo; + private Dictionary> supportedSilosByTypeCode; private readonly Logger logger = LogManager.GetLogger("GrainTypeManager"); private readonly GrainInterfaceMap grainInterfaceMap; private readonly Dictionary invokers = new Dictionary(); private readonly SiloAssemblyLoader loader; private static readonly object lockable = new object(); + private readonly PlacementStrategy defaultPlacementStrategy; - private readonly PlacementStrategy defaultPlacementStrategy; + internal IReadOnlyDictionary GrainInterfaceMapsBySilo + { + get { return grainInterfaceMapsBySilo; } + } public static GrainTypeManager Instance { get; private set; } public IEnumerable> GrainClassTypeData { get { return grainTypes; } } + public GrainInterfaceMap ClusterGrainInterfaceMap { get; private set; } + public static void Stop() { Instance = null; @@ -40,6 +50,8 @@ public GrainTypeManager(bool localTestMode, SiloAssemblyLoader loader, DefaultPl this.defaultPlacementStrategy = defaultPlacementStrategy.PlacementStrategy; this.loader = loader; grainInterfaceMap = new GrainInterfaceMap(localTestMode, this.defaultPlacementStrategy); + ClusterGrainInterfaceMap = grainInterfaceMap; + grainInterfaceMapsBySilo = new Dictionary(); lock (lockable) { if (Instance != null) @@ -134,10 +146,21 @@ internal bool TryGetPrimaryImplementation(string grainInterface, out string grai internal void GetTypeInfo(int typeCode, out string grainClass, out PlacementStrategy placement, out MultiClusterRegistrationStrategy activationStrategy, string genericArguments = null) { - if (!grainInterfaceMap.TryGetTypeInfo(typeCode, out grainClass, out placement, out activationStrategy, genericArguments)) + if (!ClusterGrainInterfaceMap.TryGetTypeInfo(typeCode, out grainClass, out placement, out activationStrategy, genericArguments)) throw new OrleansException(String.Format("Unexpected: Cannot find an implementation class for grain interface {0}", typeCode)); } + internal void SetInterfaceMapsBySilo(Dictionary value) + { + grainInterfaceMapsBySilo = value; + RebuildFullGrainInterfaceMap(); + } + + internal IList GetSupportedSilos(int typeCode) + { + return supportedSilosByTypeCode[typeCode]; + } + private void InitializeGrainClassData(SiloAssemblyLoader loader, bool strict) { grainTypes = loader.GetGrainClassTypes(strict); @@ -196,7 +219,7 @@ public bool TryGetData(string name, out GrainTypeData result) return grainTypes.TryGetValue(name, out result); } - internal IGrainTypeResolver GetTypeCodeMap() + internal GrainInterfaceMap GetTypeCodeMap() { // the map is immutable at this point return grainInterfaceMap; @@ -241,6 +264,28 @@ internal IGrainMethodInvoker GetInvoker(int interfaceId, string genericGrainType interfaceName, interfaceId)); } + private void RebuildFullGrainInterfaceMap() + { + var newClusterGrainInterfaceMap = new GrainInterfaceMap(false, defaultPlacementStrategy); + var newSupportedSilosByTypeCode = new Dictionary>(); + newClusterGrainInterfaceMap.AddMap(grainInterfaceMap); + foreach (var kvp in grainInterfaceMapsBySilo) + { + newClusterGrainInterfaceMap.AddMap(kvp.Value); + foreach (var grainType in kvp.Value.SupportedGrainTypes) + { + IList supportedSilos; + if (!newSupportedSilosByTypeCode.TryGetValue(grainType, out supportedSilos)) + { + newSupportedSilosByTypeCode[grainType] = supportedSilos = new List(); + } + supportedSilos.Add(kvp.Key); + } + } + ClusterGrainInterfaceMap = newClusterGrainInterfaceMap; + supportedSilosByTypeCode = newSupportedSilosByTypeCode; + } + private class InvokerData { private readonly Type baseInvokerType; diff --git a/src/OrleansRuntime/GrainTypeManager/SiloAssemblyLoader.cs b/src/OrleansRuntime/GrainTypeManager/SiloAssemblyLoader.cs index f35c582997..591012d1f6 100644 --- a/src/OrleansRuntime/GrainTypeManager/SiloAssemblyLoader.cs +++ b/src/OrleansRuntime/GrainTypeManager/SiloAssemblyLoader.cs @@ -13,17 +13,21 @@ namespace Orleans.Runtime { internal class SiloAssemblyLoader { + private readonly List excludedGrains; private readonly LoggerImpl logger = LogManager.GetLogger("AssemblyLoader.Silo"); private List discoveredAssemblyLocations; private Dictionary directories; public SiloAssemblyLoader(NodeConfiguration nodeConfig) - : this(nodeConfig.AdditionalAssemblyDirectories) + : this(nodeConfig.AdditionalAssemblyDirectories, nodeConfig.ExcludedGrainTypes) { } - public SiloAssemblyLoader(IDictionary additionalDirectories) + public SiloAssemblyLoader(IDictionary additionalDirectories, IEnumerable excludedGrains = null) { + this.excludedGrains = excludedGrains != null + ? new List(excludedGrains) + : new List(); var exeRoot = Path.GetDirectoryName(typeof(SiloAssemblyLoader).GetTypeInfo().Assembly.Location); var appRoot = Path.Combine(exeRoot, "Applications"); var cwd = Directory.GetCurrentDirectory(); @@ -80,6 +84,9 @@ private void LoadApplicationAssemblies() foreach (var grainType in grainTypes) { var className = TypeUtils.GetFullName(grainType); + if (excludedGrains.Contains(className)) + continue; + if (result.ContainsKey(className)) throw new InvalidOperationException( string.Format("Precondition violated: GetLoadedGrainTypes should not return a duplicate type ({0})", className)); diff --git a/src/OrleansRuntime/GrainTypeManager/TypeManager.cs b/src/OrleansRuntime/GrainTypeManager/TypeManager.cs index 758b4483d7..b168c1c3e7 100644 --- a/src/OrleansRuntime/GrainTypeManager/TypeManager.cs +++ b/src/OrleansRuntime/GrainTypeManager/TypeManager.cs @@ -1,21 +1,49 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Orleans.Runtime.Providers; +using Orleans.Runtime.Scheduler; namespace Orleans.Runtime { - internal class TypeManager : SystemTarget, ITypeManager + internal class TypeManager : SystemTarget, IClusterTypeManager, ISiloTypeManager, ISiloStatusListener { + private readonly Logger logger = LogManager.GetLogger("TypeManager"); private readonly GrainTypeManager grainTypeManager; + private readonly ISiloStatusOracle statusOracle; + private readonly OrleansTaskScheduler scheduler; + private bool hasToRefreshClusterGrainInterfaceMap; + private readonly AsyncTaskSafeTimer refreshClusterGrainInterfaceMapTimer; - internal TypeManager(SiloAddress myAddr, GrainTypeManager grainTypeManager) + internal TypeManager(SiloAddress myAddr, GrainTypeManager grainTypeManager, ISiloStatusOracle oracle, OrleansTaskScheduler scheduler, TimeSpan refreshClusterMapTimeout) : base(Constants.TypeManagerId, myAddr) { + if (grainTypeManager == null) + throw new ArgumentNullException(nameof(grainTypeManager)); + if (oracle == null) + throw new ArgumentNullException(nameof(oracle)); + if (scheduler == null) + throw new ArgumentNullException(nameof(scheduler)); + this.grainTypeManager = grainTypeManager; + statusOracle = oracle; + this.scheduler = scheduler; + hasToRefreshClusterGrainInterfaceMap = true; + this.refreshClusterGrainInterfaceMapTimer = new AsyncTaskSafeTimer( + OnRefreshClusterMapTimer, + null, + TimeSpan.Zero, // Force to do it once right now + refreshClusterMapTimeout); } - public Task GetTypeCodeMap(SiloAddress silo) + public Task GetClusterTypeCodeMap() + { + return Task.FromResult(grainTypeManager.ClusterGrainInterfaceMap); + } + + public Task GetSiloTypeCodeMap() { return Task.FromResult(grainTypeManager.GetTypeCodeMap()); } @@ -29,6 +57,80 @@ public Task GetImplicitStreamSubscriberTa } return Task.FromResult(table); } + + public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus status) + { + hasToRefreshClusterGrainInterfaceMap = true; + } + + private async Task OnRefreshClusterMapTimer(object _) + { + // Check if we have to refresh + if (!hasToRefreshClusterGrainInterfaceMap) + { + logger.Verbose3("OnRefreshClusterMapTimer: no refresh required"); + return; + } + hasToRefreshClusterGrainInterfaceMap = false; + + logger.Info("OnRefreshClusterMapTimer: refresh start"); + var activeSilos = statusOracle.GetApproximateSiloStatuses(onlyActive: true); + var knownSilosClusterGrainInterfaceMap = grainTypeManager.GrainInterfaceMapsBySilo; + + // Build the new map. Always start by himself + var newSilosClusterGrainInterfaceMap = new Dictionary + { + {this.Silo, grainTypeManager.GetTypeCodeMap()} + }; + var getGrainInterfaceMapTasks = new List>>(); + + + foreach (var siloAddress in activeSilos.Keys) + { + if (siloAddress.Equals(this.Silo)) continue; + + GrainInterfaceMap value; + if (knownSilosClusterGrainInterfaceMap.TryGetValue(siloAddress, out value)) + { + logger.Verbose3($"OnRefreshClusterMapTimer: value already found locally for {siloAddress}"); + newSilosClusterGrainInterfaceMap[siloAddress] = value; + } + else + { + // Value not found, let's get it + logger.Verbose3($"OnRefreshClusterMapTimer: value not found locally for {siloAddress}"); + getGrainInterfaceMapTasks.Add(GetTargetSiloGrainInterfaceMap(siloAddress)); + } + } + + if (getGrainInterfaceMapTasks.Any()) + { + foreach (var keyValuePair in await Task.WhenAll(getGrainInterfaceMapTasks)) + { + if (keyValuePair.Value != null) + newSilosClusterGrainInterfaceMap.Add(keyValuePair.Key, keyValuePair.Value); + } + } + + grainTypeManager.SetInterfaceMapsBySilo(newSilosClusterGrainInterfaceMap); + } + + private async Task> GetTargetSiloGrainInterfaceMap(SiloAddress siloAddress) + { + try + { + var remoteTypeManager = InsideRuntimeClient.Current.InternalGrainFactory.GetSystemTarget(Constants.TypeManagerId, siloAddress); + var siloTypeCodeMap = await scheduler.QueueTask(() => remoteTypeManager.GetSiloTypeCodeMap(), SchedulingContext); + return new KeyValuePair(siloAddress, siloTypeCodeMap); + } + catch (Exception ex) + { + // Will be retried on the next timer hit + logger.Error(ErrorCode.TypeManager_GetSiloGrainInterfaceMapError, $"Exception when trying to get GrainInterfaceMap for silos {siloAddress}", ex); + hasToRefreshClusterGrainInterfaceMap = true; + return new KeyValuePair(siloAddress, null); + } + } } } diff --git a/src/OrleansRuntime/Placement/ActivationCountPlacementDirector.cs b/src/OrleansRuntime/Placement/ActivationCountPlacementDirector.cs index ececdd2c65..57de373970 100644 --- a/src/OrleansRuntime/Placement/ActivationCountPlacementDirector.cs +++ b/src/OrleansRuntime/Placement/ActivationCountPlacementDirector.cs @@ -89,11 +89,13 @@ private Task MakePlacement(PlacementStrategy strategy, GrainId public Task SelectSiloPowerOfK(PlacementStrategy strategy, GrainId grain, IPlacementContext context) { - // Exclude overloaded silos + var compatibleSilos = context.GetCompatibleSiloList(grain); + // Exclude overloaded and non-compatible silos var relevantSilos = new List(); foreach (CachedLocalStat current in localCache.Values) { if (IsSiloOverloaded(current.SiloStats)) continue; + if (!compatibleSilos.Contains(current.Address)) continue; relevantSilos.Add(current); } diff --git a/src/OrleansRuntime/Placement/IPlacementContext.cs b/src/OrleansRuntime/Placement/IPlacementContext.cs index f6133f4856..ea819f956e 100644 --- a/src/OrleansRuntime/Placement/IPlacementContext.cs +++ b/src/OrleansRuntime/Placement/IPlacementContext.cs @@ -22,7 +22,7 @@ internal interface IPlacementContext bool LocalLookup(GrainId grain, out List addresses); - List AllActiveSilos { get; } + IList GetCompatibleSiloList(GrainId grain); SiloAddress LocalSilo { get; } diff --git a/src/OrleansRuntime/Placement/PreferLocalPlacementDirector.cs b/src/OrleansRuntime/Placement/PreferLocalPlacementDirector.cs index a4b87cfe0b..9c80dfd674 100644 --- a/src/OrleansRuntime/Placement/PreferLocalPlacementDirector.cs +++ b/src/OrleansRuntime/Placement/PreferLocalPlacementDirector.cs @@ -15,8 +15,8 @@ internal class PreferLocalPlacementDirector : RandomPlacementDirector, IPlacemen public override Task OnAddActivation(PlacementStrategy strategy, GrainId grain, IPlacementContext context) { - // if local silo is not active, revert to random placement - if (context.LocalSiloStatus != SiloStatus.Active) + // if local silo is not active or does not support this type of grain, revert to random placement + if (context.LocalSiloStatus != SiloStatus.Active || !context.GetCompatibleSiloList(grain).Contains(context.LocalSilo)) return base.OnAddActivation(strategy, grain, context); var grainType = context.GetGrainTypeName(grain); diff --git a/src/OrleansRuntime/Placement/RandomPlacementDirector.cs b/src/OrleansRuntime/Placement/RandomPlacementDirector.cs index 9d5153382b..48573b70fb 100644 --- a/src/OrleansRuntime/Placement/RandomPlacementDirector.cs +++ b/src/OrleansRuntime/Placement/RandomPlacementDirector.cs @@ -41,7 +41,7 @@ protected PlacementResult ChooseRandomActivation(List places, PlacementStrategy strategy, GrainId grain, IPlacementContext context) { var grainType = context.GetGrainTypeName(grain); - var allSilos = context.AllActiveSilos; + var allSilos = context.GetCompatibleSiloList(grain); return Task.FromResult( PlacementResult.SpecifyCreation(allSilos[random.Next(allSilos.Count)], strategy, grainType)); } diff --git a/src/OrleansRuntime/Silo/Silo.cs b/src/OrleansRuntime/Silo/Silo.cs index 339d5fd497..7a092f6159 100644 --- a/src/OrleansRuntime/Silo/Silo.cs +++ b/src/OrleansRuntime/Silo/Silo.cs @@ -60,7 +60,8 @@ public enum SiloType private readonly IncomingMessageAgent incomingSystemAgent; private readonly IncomingMessageAgent incomingPingAgent; private readonly Logger logger; - private readonly GrainTypeManager typeManager; + private readonly GrainTypeManager grainTypeManager; + private TypeManager typeManager; private readonly ManualResetEvent siloTerminatedEvent; private readonly SiloStatisticsManager siloStatistics; private readonly MembershipFactory membershipFactory; @@ -93,6 +94,7 @@ public enum SiloType internal GlobalConfiguration GlobalConfig => this.initializationParams.GlobalConfig; internal NodeConfiguration LocalConfig => this.initializationParams.NodeConfig; internal OrleansTaskScheduler LocalScheduler { get { return scheduler; } } + internal GrainTypeManager LocalGrainTypeManager { get { return grainTypeManager; } } internal ILocalGrainDirectory LocalGrainDirectory { get { return localGrainDirectory; } } internal ISiloStatusOracle LocalSiloStatusOracle { get { return membershipOracle; } } internal IMultiClusterOracle LocalMultiClusterOracle { get { return multiClusterOracle; } } @@ -304,7 +306,7 @@ internal Silo(SiloInitializationParameters initializationParams) throw; } - typeManager = Services.GetRequiredService(); + grainTypeManager = Services.GetRequiredService(); // Performance metrics siloStatistics = Services.GetRequiredService(); @@ -380,8 +382,10 @@ private void CreateSystemTargets() RegisterSystemTarget(LocalGrainDirectory.RemoteClusterGrainDirectory); logger.Verbose("Creating {0} System Target", "ClientObserverRegistrar + TypeManager"); + this.RegisterSystemTarget(this.Services.GetRequiredService()); - this.RegisterSystemTarget(new TypeManager(this.SiloAddress, this.typeManager)); + typeManager = new TypeManager(SiloAddress, this.grainTypeManager, membershipOracle, LocalScheduler, GlobalConfig.TypeMapRefreshInterval); + this.RegisterSystemTarget(typeManager); logger.Verbose("Creating {0} System Target", "MembershipOracle"); if (this.membershipOracle is SystemTarget) @@ -416,6 +420,8 @@ private void InjectDependencies() // consistentRingProvider is not a system target per say, but it behaves like the localGrainDirectory, so it is here LocalSiloStatusOracle.SubscribeToSiloStatusEvents((ISiloStatusListener)RingProvider); + LocalSiloStatusOracle.SubscribeToSiloStatusEvents(typeManager); + LocalSiloStatusOracle.SubscribeToSiloStatusEvents(Services.GetRequiredService()); if (!GlobalConfig.ReminderServiceType.Equals(GlobalConfiguration.ReminderServiceProviderType.Disabled)) @@ -474,7 +480,7 @@ private void DoStart() ConfigureThreadPoolAndServicePointSettings(); // This has to start first so that the directory system target factory gets loaded before we start the router. - typeManager.Start(); + grainTypeManager.Start(); runtimeClient.Start(); // The order of these 4 is pretty much arbitrary. diff --git a/test/Tester/ExceptionPropagationTests.cs b/test/Tester/ExceptionPropagationTests.cs index b75d3287e0..1fb9995ab9 100644 --- a/test/Tester/ExceptionPropagationTests.cs +++ b/test/Tester/ExceptionPropagationTests.cs @@ -33,6 +33,7 @@ protected override TestCluster CreateTestCluster() var options = new TestClusterOptions(2); options.ClientConfiguration.SerializationProviders.Add(typeof(OneWaySerializer).GetTypeInfo()); options.ClusterConfiguration.Globals.SerializationProviders.Add(typeof(OneWaySerializer).GetTypeInfo()); + options.ClusterConfiguration.Globals.TypeMapRefreshInterval = TimeSpan.FromMilliseconds(200); return new TestCluster(options); } } diff --git a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs new file mode 100644 index 0000000000..063a06d233 --- /dev/null +++ b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs @@ -0,0 +1,102 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Orleans; +using Orleans.Runtime; +using Orleans.TestingHost; +using TestExtensions; +using UnitTests.GrainInterfaces; +using UnitTests.Grains; +using Xunit; + +namespace Tester.HeterogeneousSilosTests +{ + public class HeterogeneousTests : OrleansTestingBase, IDisposable + { + private TestCluster cluster; + private readonly TimeSpan refreshInterval = TimeSpan.FromMilliseconds(200); + + private void SetupAndDeployCluster(string defaultPlacementStrategy, params Type[] blackListedTypes) + { + cluster?.StopAllSilos(); + var typesName = blackListedTypes.Select(t => t.FullName).ToList(); + var options = new TestClusterOptions(1); + options.ClusterConfiguration.Globals.TypeMapRefreshInterval = refreshInterval; + options.ClusterConfiguration.Globals.DefaultPlacementStrategy = defaultPlacementStrategy; + options.ClusterConfiguration.Overrides[Silo.PrimarySiloName].ExcludedGrainTypes = typesName; + cluster = new TestCluster(options); + cluster.Deploy(); + } + + public void Dispose() + { + cluster?.StopAllSilos(); + cluster = null; + } + + [Fact] + public void GrainExcludedTest() + { + SetupAndDeployCluster("RandomPlacement", typeof(TestGrain)); + + // Should fail + var exception = Assert.Throws(() => GrainFactory.GetGrain(0)); + Assert.Contains("Cannot find an implementation class for grain interface", exception.Message); + + // Should not fail + GrainFactory.GetGrain(0); + } + + + [Fact] + public async Task MergeGrainResolverTests() + { + await MergeGrainResolverTestsImpl("RandomPlacement", typeof(TestGrain)); + await MergeGrainResolverTestsImpl("PreferLocalPlacement", typeof(TestGrain)); + await MergeGrainResolverTestsImpl("ActivationCountBasedPlacement", typeof(TestGrain)); + } + + private async Task MergeGrainResolverTestsImpl(string defaultPlacementStrategy, params Type[] blackListedTypes) + { + SetupAndDeployCluster(defaultPlacementStrategy, blackListedTypes); + + var delayTimeout = refreshInterval.Add(refreshInterval); + + // Should fail + var exception = Assert.Throws(() => GrainFactory.GetGrain(0)); + Assert.Contains("Cannot find an implementation class for grain interface", exception.Message); + + // Start a new silo with TestGrain + cluster.StartAdditionalSilo(); + await Task.Delay(delayTimeout); + + // Disconnect/Reconnect the client + GrainClient.Uninitialize(); + cluster.InitializeClient(); + + for (var i = 0; i < 5; i++) + { + // Success + var g = GrainFactory.GetGrain(i); + await g.SetLabel("Hello world"); + } + + // Stop the latest silos + cluster.StopSecondarySilos(); + await Task.Delay(delayTimeout); + + var grain = GrainFactory.GetGrain(0); + var orleansException = await Assert.ThrowsAsync(() => grain.SetLabel("Hello world")); + Assert.Contains("Cannot find an implementation class for grain interface", orleansException.Message); + + // Disconnect/Reconnect the client + GrainClient.Uninitialize(); + cluster.InitializeClient(); + + // Should fail + exception = Assert.Throws(() => GrainFactory.GetGrain(0)); + Assert.Contains("Cannot find an implementation class for grain interface", exception.Message); + } + } +} diff --git a/test/Tester/MembershipTests/SilosStopTests.cs b/test/Tester/MembershipTests/SilosStopTests.cs index 0702928409..f89c31520c 100644 --- a/test/Tester/MembershipTests/SilosStopTests.cs +++ b/test/Tester/MembershipTests/SilosStopTests.cs @@ -18,6 +18,7 @@ public override TestCluster CreateTestCluster() options.ClusterConfiguration.Globals.DefaultPlacementStrategy = "ActivationCountBasedPlacement"; options.ClusterConfiguration.Globals.NumMissedProbesLimit = 1; options.ClusterConfiguration.Globals.NumVotesForDeathDeclaration = 1; + options.ClusterConfiguration.Globals.TypeMapRefreshInterval = TimeSpan.FromMilliseconds(100); // use only Primary as the gateway options.ClientConfiguration.Gateways = options.ClientConfiguration.Gateways.Take(1).ToList(); diff --git a/test/Tester/Tester.csproj b/test/Tester/Tester.csproj index b2df34b8fc..fc2bef456f 100644 --- a/test/Tester/Tester.csproj +++ b/test/Tester/Tester.csproj @@ -48,6 +48,7 @@ + diff --git a/test/TesterInternal/General/ElasticPlacementTest.cs b/test/TesterInternal/General/ElasticPlacementTest.cs index 9f1b2ed026..621f05acc1 100644 --- a/test/TesterInternal/General/ElasticPlacementTest.cs +++ b/test/TesterInternal/General/ElasticPlacementTest.cs @@ -32,6 +32,7 @@ public override TestCluster CreateTestCluster() options.ClusterConfiguration.Globals.LivenessType = GlobalConfiguration.LivenessProviderType.AzureTable; options.ClientConfiguration.GatewayProvider = ClientConfiguration.GatewayProviderType.AzureTable; + options.ClusterConfiguration.Globals.TypeMapRefreshInterval = TimeSpan.FromMilliseconds(100); return new TestCluster(options); }