Skip to content

Commit

Permalink
RavenDB-618 Includes for Load<T> not working in sharded environment
Browse files Browse the repository at this point in the history
  • Loading branch information
ayende committed Oct 15, 2012
1 parent ca10f46 commit 4cc039e
Showing 1 changed file with 55 additions and 50 deletions.
105 changes: 55 additions & 50 deletions Raven.Client.Lightweight/Shard/ShardedDocumentSession.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace Raven.Client.Shard
/// Implements Unit of Work for accessing a set of sharded RavenDB servers /// Implements Unit of Work for accessing a set of sharded RavenDB servers
/// </summary> /// </summary>
public class ShardedDocumentSession : BaseShardedDocumentSession<IDatabaseCommands>, IDocumentQueryGenerator, public class ShardedDocumentSession : BaseShardedDocumentSession<IDatabaseCommands>, IDocumentQueryGenerator,
IDocumentSessionImpl, ISyncAdvancedSessionOperation IDocumentSessionImpl, ISyncAdvancedSessionOperation
{ {
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="ShardedDocumentSession"/> class. /// Initializes a new instance of the <see cref="ShardedDocumentSession"/> class.
Expand All @@ -43,7 +43,7 @@ public class ShardedDocumentSession : BaseShardedDocumentSession<IDatabaseComman
/// <param name="documentStore"></param> /// <param name="documentStore"></param>
/// <param name="listeners"></param> /// <param name="listeners"></param>
public ShardedDocumentSession(ShardedDocumentStore documentStore, DocumentSessionListeners listeners, Guid id, public ShardedDocumentSession(ShardedDocumentStore documentStore, DocumentSessionListeners listeners, Guid id,
ShardStrategy shardStrategy, IDictionary<string, IDatabaseCommands> shardDbCommands) ShardStrategy shardStrategy, IDictionary<string, IDatabaseCommands> shardDbCommands)
: base(documentStore, listeners, id, shardStrategy, shardDbCommands) : base(documentStore, listeners, id, shardStrategy, shardDbCommands)
{ {
} }
Expand All @@ -52,27 +52,27 @@ protected override JsonDocument GetJsonDocument(string documentKey)
{ {
var shardRequestData = new ShardRequestData var shardRequestData = new ShardRequestData
{ {
EntityType = typeof (object), EntityType = typeof(object),
Keys = {documentKey} Keys = { documentKey }
}; };
var dbCommands = GetCommandsToOperateOn(shardRequestData); var dbCommands = GetCommandsToOperateOn(shardRequestData);


var documents = shardStrategy.ShardAccessStrategy.Apply(dbCommands, var documents = shardStrategy.ShardAccessStrategy.Apply(dbCommands,
shardRequestData, shardRequestData,
(commands, i) => commands.Get(documentKey)); (commands, i) => commands.Get(documentKey));


var document = documents.FirstOrDefault(x => x != null); var document = documents.FirstOrDefault(x => x != null);
if (document != null) if (document != null)
return document; return document;


throw new InvalidOperationException("Document '" + documentKey + "' no longer exists and was probably deleted"); throw new InvalidOperationException("Document '" + documentKey + "' no longer exists and was probably deleted");
} }


protected override string GenerateKey(object entity) protected override string GenerateKey(object entity)
{ {
var shardId = shardStrategy.ShardResolutionStrategy.MetadataShardIdFor(entity); var shardId = shardStrategy.ShardResolutionStrategy.MetadataShardIdFor(entity);
IDatabaseCommands value; IDatabaseCommands value;
if (shardDbCommands.TryGetValue(shardId, out value) == false) if (shardDbCommands.TryGetValue(shardId, out value) == false)
throw new InvalidOperationException("Could not find shard: " + shardId); throw new InvalidOperationException("Could not find shard: " + shardId);
return Conventions.GenerateDocumentKey(value, entity); return Conventions.GenerateDocumentKey(value, entity);
} }
Expand All @@ -87,7 +87,7 @@ protected override Task<string> GenerateKeyAsync(object entity)
ISyncAdvancedSessionOperation IDocumentSession.Advanced ISyncAdvancedSessionOperation IDocumentSession.Advanced
{ {
get { return this; } get { return this; }
} }


/// <summary> /// <summary>
/// Access the lazy operations /// Access the lazy operations
Expand Down Expand Up @@ -115,15 +115,15 @@ public T Load<T>(string id)
{ {
object existingEntity; object existingEntity;
if (entitiesByKey.TryGetValue(id, out existingEntity)) if (entitiesByKey.TryGetValue(id, out existingEntity))
{ {
return (T) existingEntity; return (T)existingEntity;
} }


IncrementRequestCount(); IncrementRequestCount();
var shardRequestData = new ShardRequestData var shardRequestData = new ShardRequestData
{ {
EntityType = typeof (T), EntityType = typeof(T),
Keys = {id} Keys = { id }
}; };
var dbCommands = GetCommandsToOperateOn(shardRequestData); var dbCommands = GetCommandsToOperateOn(shardRequestData);
var results = shardStrategy.ShardAccessStrategy.Apply(dbCommands, shardRequestData, (commands, i) => var results = shardStrategy.ShardAccessStrategy.Apply(dbCommands, shardRequestData, (commands, i) =>
Expand All @@ -145,7 +145,7 @@ public T Load<T>(string id)
if (shardsContainThisDocument.Count() > 1) if (shardsContainThisDocument.Count() > 1)
{ {
throw new InvalidOperationException("Found document with id: " + id + throw new InvalidOperationException("Found document with id: " + id +
" on more than a single shard, which is not allowed. Document keys have to be unique cluster-wide."); " on more than a single shard, which is not allowed. Document keys have to be unique cluster-wide.");
} }


return shardsContainThisDocument.FirstOrDefault(); return shardsContainThisDocument.FirstOrDefault();
Expand All @@ -163,7 +163,7 @@ public T[] Load<T>(IEnumerable<string> ids)


public T Load<T>(ValueType id) public T Load<T>(ValueType id)
{ {
var documentKey = Conventions.FindFullDocumentKeyFromNonStringIdentifier(id, typeof (T), false); var documentKey = Conventions.FindFullDocumentKeyFromNonStringIdentifier(id, typeof(T), false);
return Load<T>(documentKey); return Load<T>(documentKey);
} }


Expand All @@ -187,7 +187,7 @@ public T[] LoadInternal<T>(string[] ids, string[] includes)
var currentShardIds = shard.Select(x => x.Id).ToArray(); var currentShardIds = shard.Select(x => x.Id).ToArray();
var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, new ShardRequestData var multiLoadOperations = shardStrategy.ShardAccessStrategy.Apply(shard.Key, new ShardRequestData
{ {
EntityType = typeof (T), EntityType = typeof(T),
Keys = currentShardIds.ToList() Keys = currentShardIds.ToList()
}, (dbCmd, i) => }, (dbCmd, i) =>
{ {
Expand All @@ -213,15 +213,20 @@ public T[] LoadInternal<T>(string[] ids, string[] includes)
var id = currentShardIds[i]; var id = currentShardIds[i];
var itemPosition = Array.IndexOf(ids, id); var itemPosition = Array.IndexOf(ids, id);
if (ReferenceEquals(results[itemPosition], default(T)) == false) if (ReferenceEquals(results[itemPosition], default(T)) == false)
{ {
throw new InvalidOperationException("Found document with id: " + id + throw new InvalidOperationException("Found document with id: " + id +
" on more than a single shard, which is not allowed. Document keys have to be unique cluster-wide."); " on more than a single shard, which is not allowed. Document keys have to be unique cluster-wide.");
} }
results[itemPosition] = loadResults[i]; results[itemPosition] = loadResults[i];
} }
} }
} }
return results; return ids.Select(id => // so we get items that were skipped because they are already in the session cache
{
object val;
entitiesByKey.TryGetValue(id, out val);
return (T)val;
}).ToArray();
} }


public ILoaderWithInclude<object> Include(string path) public ILoaderWithInclude<object> Include(string path)
Expand Down Expand Up @@ -251,7 +256,7 @@ public Lazy<TResult[]> Load<TResult>(IEnumerable<string> ids, Action<TResult[]>
/// </summary> /// </summary>
Lazy<TResult> ILazySessionOperations.Load<TResult>(string id) Lazy<TResult> ILazySessionOperations.Load<TResult>(string id)
{ {
return Lazily.Load(id, (Action<TResult>) null); return Lazily.Load(id, (Action<TResult>)null);
} }


/// <summary> /// <summary>
Expand All @@ -261,28 +266,28 @@ public Lazy<TResult> Load<TResult>(string id, Action<TResult> onEval)
{ {
var cmds = GetCommandsToOperateOn(new ShardRequestData var cmds = GetCommandsToOperateOn(new ShardRequestData
{ {
Keys = {id}, Keys = { id },
EntityType = typeof (TResult) EntityType = typeof(TResult)
}); });


var lazyLoadOperation = new LazyLoadOperation<TResult>(id, new LoadOperation(this, () => var lazyLoadOperation = new LazyLoadOperation<TResult>(id, new LoadOperation(this, () =>
{ {
var list = cmds.Select(databaseCommands => databaseCommands.DisableAllCaching()).ToList(); var list = cmds.Select(databaseCommands => databaseCommands.DisableAllCaching()).ToList();
return new DisposableAction(() => list.ForEach(x => x.Dispose())); return new DisposableAction(() => list.ForEach(x => x.Dispose()));
}, id)); }, id));
return AddLazyOperation(lazyLoadOperation, onEval, cmds); return AddLazyOperation(lazyLoadOperation, onEval, cmds);
} }


internal Lazy<T> AddLazyOperation<T>(ILazyOperation operation, Action<T> onEval, IList<IDatabaseCommands> cmds) internal Lazy<T> AddLazyOperation<T>(ILazyOperation operation, Action<T> onEval, IList<IDatabaseCommands> cmds)
{ {
pendingLazyOperations.Add(Tuple.Create(operation, cmds)); pendingLazyOperations.Add(Tuple.Create(operation, cmds));
var lazyValue = new Lazy<T>(() => var lazyValue = new Lazy<T>(() =>
{ {
ExecuteAllPendingLazyOperations(); ExecuteAllPendingLazyOperations();
return (T) operation.Result; return (T)operation.Result;
}); });
if (onEval != null) if (onEval != null)
onEvaluateLazy[operation] = result => onEval((T) result); onEvaluateLazy[operation] = result => onEval((T)result);


return lazyValue; return lazyValue;
} }
Expand Down Expand Up @@ -318,7 +323,7 @@ Lazy<TResult> ILazySessionOperations.Load<TResult>(ValueType id)
/// </remarks> /// </remarks>
public Lazy<TResult> Load<TResult>(ValueType id, Action<TResult> onEval) public Lazy<TResult> Load<TResult>(ValueType id, Action<TResult> onEval)
{ {
var documentKey = Conventions.FindFullDocumentKeyFromNonStringIdentifier(id, typeof (TResult), false); var documentKey = Conventions.FindFullDocumentKeyFromNonStringIdentifier(id, typeof(TResult), false);
return Lazily.Load<TResult>(documentKey); return Lazily.Load<TResult>(documentKey);
} }


Expand Down Expand Up @@ -359,8 +364,8 @@ public Lazy<T[]> LazyLoadInternal<T>(string[] ids, string[] includes, Action<T[]
id, id,
shards = GetCommandsToOperateOn(new ShardRequestData shards = GetCommandsToOperateOn(new ShardRequestData
{ {
Keys = {id}, Keys = { id },
EntityType = typeof (T), EntityType = typeof(T),
}) })
}) })
.GroupBy(x => x.shards, new DbCmdsListComparer()); .GroupBy(x => x.shards, new DbCmdsListComparer());
Expand Down Expand Up @@ -413,15 +418,15 @@ private bool ExecuteLazyOperationsSingleStep()
var lazyOperations = operationPerShard.Select(x => x.Item1).ToArray(); var lazyOperations = operationPerShard.Select(x => x.Item1).ToArray();
var requests = lazyOperations.Select(x => x.CraeteRequest()).ToArray(); var requests = lazyOperations.Select(x => x.CraeteRequest()).ToArray();
var multiResponses = shardStrategy.ShardAccessStrategy.Apply(operationPerShard.Key, new ShardRequestData(), var multiResponses = shardStrategy.ShardAccessStrategy.Apply(operationPerShard.Key, new ShardRequestData(),
(commands, i) => commands.MultiGet(requests)); (commands, i) => commands.MultiGet(requests));


var sb = new StringBuilder(); var sb = new StringBuilder();
foreach (var response in from shardReponses in multiResponses foreach (var response in from shardReponses in multiResponses
from getResponse in shardReponses from getResponse in shardReponses
where getResponse.RequestHasErrors() where getResponse.RequestHasErrors()
select getResponse) select getResponse)
sb.AppendFormat("Got an error from server, status code: {0}{1}{2}", response.Status, Environment.NewLine, sb.AppendFormat("Got an error from server, status code: {0}{1}{2}", response.Status, Environment.NewLine,
response.Result) response.Result)
.AppendLine(); .AppendLine();


if (sb.Length > 0) if (sb.Length > 0)
Expand Down Expand Up @@ -468,13 +473,13 @@ protected override IAsyncDocumentQuery<T> IDocumentQueryGeneratorAsyncQuery<T>(s
public IDocumentQuery<T> LuceneQuery<T>(string indexName) public IDocumentQuery<T> LuceneQuery<T>(string indexName)
{ {
return new ShardedDocumentQuery<T>(this, GetShardsToOperateOn, shardStrategy, indexName, null, null, return new ShardedDocumentQuery<T>(this, GetShardsToOperateOn, shardStrategy, indexName, null, null,
listeners.QueryListeners); listeners.QueryListeners);
} }


public IDocumentQuery<T> LuceneQuery<T>() public IDocumentQuery<T> LuceneQuery<T>()
{ {
return LuceneQuery<T>(GetDynamicIndexName<T>()); return LuceneQuery<T>(GetDynamicIndexName<T>());
} }


#endregion #endregion


Expand Down Expand Up @@ -530,8 +535,8 @@ void ISyncAdvancedSessionOperation.Refresh<T>(T entity)


var shardRequestData = new ShardRequestData var shardRequestData = new ShardRequestData
{ {
EntityType = typeof (T), EntityType = typeof(T),
Keys = {value.Key} Keys = { value.Key }
}; };
var dbCommands = GetCommandsToOperateOn(shardRequestData); var dbCommands = GetCommandsToOperateOn(shardRequestData);


Expand All @@ -542,7 +547,7 @@ void ISyncAdvancedSessionOperation.Refresh<T>(T entity)
return false; return false;
value.Metadata = jsonDocument.Metadata; value.Metadata = jsonDocument.Metadata;
value.OriginalMetadata = (RavenJObject) jsonDocument.Metadata.CloneToken(); value.OriginalMetadata = (RavenJObject)jsonDocument.Metadata.CloneToken();
value.ETag = jsonDocument.Etag; value.ETag = jsonDocument.Etag;
value.OriginalValue = jsonDocument.DataAsJson; value.OriginalValue = jsonDocument.DataAsJson;
var newEntity = ConvertToEntity<T>(value.Key, jsonDocument.DataAsJson, jsonDocument.Metadata); var newEntity = ConvertToEntity<T>(value.Key, jsonDocument.DataAsJson, jsonDocument.Metadata);
Expand All @@ -567,13 +572,13 @@ public IEnumerable<T> LoadStartingWith<T>(string keyPrefix, string matches = nul
IncrementRequestCount(); IncrementRequestCount();
var shards = GetCommandsToOperateOn(new ShardRequestData var shards = GetCommandsToOperateOn(new ShardRequestData
{ {
EntityType = typeof (T), EntityType = typeof(T),
Keys = {keyPrefix} Keys = { keyPrefix }
}); });
var results = shardStrategy.ShardAccessStrategy.Apply(shards, new ShardRequestData var results = shardStrategy.ShardAccessStrategy.Apply(shards, new ShardRequestData
{ {
EntityType = typeof (T), EntityType = typeof(T),
Keys = {keyPrefix} Keys = { keyPrefix }
}, (dbCmd, i) => dbCmd.StartsWith(keyPrefix, matches, start, pageSize)); }, (dbCmd, i) => dbCmd.StartsWith(keyPrefix, matches, start, pageSize));


return results.SelectMany(x => x).Select(TrackEntity<T>) return results.SelectMany(x => x).Select(TrackEntity<T>)
Expand All @@ -597,6 +602,6 @@ string ISyncAdvancedSessionOperation.GetDocumentUrl(object entity)
throw new InvalidOperationException("Could not find matching shard for shard id: " + shardId); throw new InvalidOperationException("Could not find matching shard for shard id: " + shardId);
return commands.UrlFor(value.Key); return commands.UrlFor(value.Key);
} }
} }
} }
#endif #endif

0 comments on commit 4cc039e

Please sign in to comment.