Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Modifying the way we are handling index updates on delete - will now …
…record the updates in a separate internal table, not subject to write locks from the index.
  • Loading branch information
ayende committed Apr 19, 2011
1 parent 7472a1c commit 0b16e1e
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 10 deletions.
1 change: 1 addition & 0 deletions Raven.Abstractions/Data/IndexStats.cs
Expand Up @@ -15,6 +15,7 @@ public class IndexStats
public int IndexingErrors { get; set; }
public Guid LastIndexedEtag { get; set; }
public DateTime LastIndexedTimestamp { get; set; }
public int TouchCount { get; set; }

public int? ReduceIndexingAttempts { get; set; }
public int? ReduceIndexingSuccesses { get; set; }
Expand Down
6 changes: 3 additions & 3 deletions Raven.Database/Server/Responders/Index.cs
Expand Up @@ -200,13 +200,13 @@ private Guid GetIndexEtag(string indexName)
Guid lastDocEtag = Guid.Empty;
Guid? lastReducedEtag = null;
bool isStale = false;
Tuple<DateTime, Guid> indexLastUpdatedAt = null;
int touchCount = 0;
Database.TransactionalStorage.Batch(accessor =>
{
isStale = accessor.Staleness.IsIndexStale(indexName, null, null);
lastDocEtag = accessor.Staleness.GetMostRecentDocumentEtag();
lastReducedEtag = accessor.Staleness.GetMostRecentReducedEtag(indexName);
indexLastUpdatedAt = accessor.Staleness.IndexLastUpdatedAt(indexName);
touchCount = accessor.Staleness.GetIndexTouchCount(indexName);
});
var indexDefinition = Database.GetIndexDefinition(indexName);
using(var md5 = MD5.Create())
Expand All @@ -215,7 +215,7 @@ private Guid GetIndexEtag(string indexName)
list.AddRange(indexDefinition.GetIndexHash());
list.AddRange(Encoding.Unicode.GetBytes(indexName));
list.AddRange(lastDocEtag.ToByteArray());
list.AddRange(indexLastUpdatedAt.Item2.ToByteArray());
list.AddRange(BitConverter.GetBytes(touchCount));
list.AddRange(BitConverter.GetBytes(isStale));
if(lastReducedEtag != null)
{
Expand Down
1 change: 1 addition & 0 deletions Raven.Database/Storage/IIndexingStorageActions.cs
Expand Up @@ -33,5 +33,6 @@ public interface IIndexingStorageActions

void UpdateLastIndexed(string index, Guid etag, DateTime timestamp);
void UpdateLastReduced(string index, Guid etag, DateTime timestamp);
void TouchIndexEtag(string index);
}
}
1 change: 1 addition & 0 deletions Raven.Database/Storage/IStalenessStorageActions.cs
Expand Up @@ -17,5 +17,6 @@ public interface IStalenessStorageActions
Tuple<DateTime, Guid> IndexLastUpdatedAt(string name);
Guid GetMostRecentDocumentEtag();
Guid? GetMostRecentReducedEtag(string name);
int GetIndexTouchCount(string indexName);
}
}
3 changes: 1 addition & 2 deletions Raven.Database/Tasks/RemoveFromIndexTask.cs
Expand Up @@ -39,8 +39,7 @@ public override void Execute(WorkContext context)
{
keysToRemove.Add(key);
}
var indexLastUpdatedAt = accessor.Staleness.IndexLastUpdatedAt(Index);
accessor.Indexing.UpdateLastIndexed(Index, indexLastUpdatedAt.Item2, DateTime.Now);
accessor.Indexing.TouchIndexEtag(Index);
});
context.IndexStorage.RemoveFromIndex(Index, keysToRemove.ToArray(), context);
}
Expand Down
1 change: 1 addition & 0 deletions Raven.Storage.Esent/Raven.Storage.Esent.csproj
Expand Up @@ -63,6 +63,7 @@
<Compile Include="EsentExtension.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SchemaCreator.cs" />
<Compile Include="SchemaUpdates\From35To36.cs" />
<Compile Include="SchemaUpdates\From34To35.cs" />
<Compile Include="SchemaUpdates\From32To33.cs" />
<Compile Include="SchemaUpdates\From29To30.cs" />
Expand Down
32 changes: 31 additions & 1 deletion Raven.Storage.Esent/SchemaCreator.cs
Expand Up @@ -12,7 +12,7 @@ namespace Raven.Storage.Esent
[CLSCompliant(false)]
public class SchemaCreator
{
public const string SchemaVersion = "3.5";
public const string SchemaVersion = "3.6";
private readonly Session session;

public SchemaCreator(Session session)
Expand All @@ -36,6 +36,7 @@ public void Create(string database)
CreateMapResultsTable(dbid);
CreateIndexingStatsTable(dbid);
CreateIndexingStatsReduceTable(dbid);
CreateIndexingEtagsTable(dbid);
CreateFilesTable(dbid);
CreateQueueTable(dbid);
CreateIdentityTable(dbid);
Expand Down Expand Up @@ -128,6 +129,35 @@ private void CreateIndexingStatsTable(JET_DBID dbid)
100);
}

// this table exists solely so that other threads can touch the index
// etag, such as when we remove an item from the index, without causing
// concurrency conflicts with the indexing thread
private void CreateIndexingEtagsTable(JET_DBID dbid)
{
JET_TABLEID tableid;
Api.JetCreateTable(session, dbid, "indexes_etag", 16, 100, out tableid);
JET_COLUMNID columnid;

Api.JetAddColumn(session, tableid, "key", new JET_COLUMNDEF
{
cbMax = 255,
coltyp = JET_coltyp.Text,
cp = JET_CP.Unicode,
grbit = ColumndefGrbit.ColumnTagged
}, null, 0, out columnid);

var defaultValue = BitConverter.GetBytes(0);
Api.JetAddColumn(session, tableid, "touches", new JET_COLUMNDEF
{
coltyp = JET_coltyp.Long,
grbit = ColumndefGrbit.ColumnFixed | ColumndefGrbit.ColumnNotNULL | ColumndefGrbit.ColumnEscrowUpdate
}, defaultValue, defaultValue.Length, out columnid);

const string indexDef = "+key\0\0";
Api.JetCreateIndex(session, tableid, "by_key", CreateIndexGrbit.IndexPrimary, indexDef, indexDef.Length,
100);
}

private void CreateIndexingStatsReduceTable(JET_DBID dbid)
{
JET_TABLEID tableid;
Expand Down
97 changes: 97 additions & 0 deletions Raven.Storage.Esent/SchemaUpdates/From35To36.cs
@@ -0,0 +1,97 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Isam.Esent.Interop;
using Raven.Database.Impl;

namespace Raven.Storage.Esent.SchemaUpdates
{
public class From35To36 : ISchemaUpdate
{
#region ISchemaUpdate Members

public string FromSchemaVersion
{
get { return "3.5"; }
}

public void Init(IUuidGenerator generator)
{
}

public void Update(Session session, JET_DBID dbid)
{
Transaction tx;
using (tx = new Transaction(session))
{
CreateIndexingEtagsTable(dbid, session);

using (var stats = new Table(session, dbid, "indexes_stats",OpenTableGrbit.None))
using (var reduce = new Table(session, dbid, "indexes_etag", OpenTableGrbit.None))
{
var tblKeyColumn = Api.GetColumnDictionary(session, stats)["key"];
var reduceKeyCol = Api.GetColumnDictionary(session, reduce)["key"];

Api.MoveBeforeFirst(session, stats);
while (Api.TryMoveNext(session, stats))
{
using(var update = new Update(session, reduce, JET_prep.Insert))
{
var indexName = Api.RetrieveColumnAsString(session, stats, tblKeyColumn, Encoding.Unicode);
Api.SetColumn(session, reduce, reduceKeyCol, indexName, Encoding.Unicode);
update.Save();
}
}
}

tx.Commit(CommitTransactionGrbit.LazyFlush);
tx.Dispose();
tx = new Transaction(session);
}

using (var details = new Table(session, dbid, "details", OpenTableGrbit.None))
{
Api.JetMove(session, details, JET_Move.First, MoveGrbit.None);
var columnids = Api.GetColumnDictionary(session, details);

using (var update = new Update(session, details, JET_prep.Replace))
{
Api.SetColumn(session, details, columnids["schema_version"], "3.6", Encoding.Unicode);

update.Save();
}
}
tx.Commit(CommitTransactionGrbit.None);
tx.Dispose();
}

private static void CreateIndexingEtagsTable(JET_DBID dbid, JET_SESID session)
{
JET_TABLEID tableid;
Api.JetCreateTable(session, dbid, "indexes_etag", 16, 100, out tableid);
JET_COLUMNID columnid;

Api.JetAddColumn(session, tableid, "key", new JET_COLUMNDEF
{
cbMax = 255,
coltyp = JET_coltyp.Text,
cp = JET_CP.Unicode,
grbit = ColumndefGrbit.ColumnTagged
}, null, 0, out columnid);

var defaultValue = BitConverter.GetBytes(0);
Api.JetAddColumn(session, tableid, "touches", new JET_COLUMNDEF
{
coltyp = JET_coltyp.Long,
grbit = ColumndefGrbit.ColumnFixed | ColumndefGrbit.ColumnNotNULL | ColumndefGrbit.ColumnEscrowUpdate
}, defaultValue, defaultValue.Length, out columnid);

const string indexDef = "+key\0\0";
Api.JetCreateIndex(session, tableid, "by_key", CreateIndexGrbit.IndexPrimary, indexDef, indexDef.Length,
100);
}


#endregion
}
}
17 changes: 16 additions & 1 deletion Raven.Storage.Esent/StorageActions/Indexing.cs
Expand Up @@ -85,9 +85,14 @@ public IEnumerable<IndexStats> GetIndexesStats()
var indexName = Api.RetrieveColumnAsString(session, IndexesStats, tableColumnsCache.IndexesStatsColumns["key"]);
Api.MakeKey(session, IndexesStatsReduce, indexName, Encoding.Unicode, MakeKeyGrbit.NewKey);
var hasReduce = Api.TrySeek(session, IndexesStatsReduce, SeekGrbit.SeekEQ);

Api.MakeKey(session, IndexesEtags, indexName, Encoding.Unicode, MakeKeyGrbit.NewKey);
Api.TrySeek(session, IndexesEtags, SeekGrbit.SeekEQ);

yield return new IndexStats
{
Name = indexName,
TouchCount = Api.RetrieveColumnAsInt32(session, IndexesStats, tableColumnsCache.IndexesStatsColumns["touches"]).Value,
IndexingAttempts =
Api.RetrieveColumnAsInt32(session, IndexesStats, tableColumnsCache.IndexesStatsColumns["attempts"]).Value,
IndexingSuccesses =
Expand Down Expand Up @@ -198,7 +203,17 @@ public void UpdateLastIndexed(string index, Guid etag, DateTime timestamp)
}
}

public void UpdateLastReduced(string index, Guid etag, DateTime timestamp)
public void TouchIndexEtag(string index)
{
Api.JetSetCurrentIndex(session, IndexesEtags, "by_key");
Api.MakeKey(session, IndexesEtags, index, Encoding.Unicode, MakeKeyGrbit.NewKey);
if (Api.TrySeek(session, IndexesEtags, SeekGrbit.SeekEQ) == false)
throw new IndexDoesNotExistsException("There is no reduce index named: " + index);

Api.EscrowUpdate(session, IndexesEtags, tableColumnsCache.IndexesEtagsColumns["touches"], 1);
}

public void UpdateLastReduced(string index, Guid etag, DateTime timestamp)
{
Api.JetSetCurrentIndex(session, IndexesStatsReduce, "by_key");
Api.MakeKey(session, IndexesStatsReduce, index, Encoding.Unicode, MakeKeyGrbit.NewKey);
Expand Down
7 changes: 7 additions & 0 deletions Raven.Storage.Esent/StorageActions/TableProperties.cs
Expand Up @@ -63,6 +63,13 @@ protected Table IndexesStatsReduce
get { return indexesStatsReduce ?? (indexesStatsReduce = new Table(session, dbid, "indexes_stats_reduce", OpenTableGrbit.None)); }
}

private Table indexesEtags;
protected Table IndexesEtags
{
get { return indexesEtags ?? (indexesEtags = new Table(session, dbid, "indexes_etag", OpenTableGrbit.None)); }
}


private Table mappedResults;
protected Table MappedResults
{
Expand Down
8 changes: 6 additions & 2 deletions Raven.Storage.Esent/TableColumnsCache.cs
Expand Up @@ -18,8 +18,11 @@ public class TableColumnsCache
public IDictionary<string, JET_COLUMNID> FilesColumns { get; set; }

public IDictionary<string, JET_COLUMNID> IndexesStatsColumns { get; set; }

public IDictionary<string, JET_COLUMNID> IndexesStatsReduceColumns { get; set; }

public IDictionary<string, JET_COLUMNID> IndexesEtagsColumns { get; set; }

public IDictionary<string, JET_COLUMNID> MappedResultsColumns { get; set; }

public IDictionary<string, JET_COLUMNID> DocumentsModifiedByTransactionsColumns { get; set; }
Expand Down Expand Up @@ -50,8 +53,9 @@ public void InitColumDictionaries(JET_INSTANCE instance, string database)
IndexesStatsColumns = Api.GetColumnDictionary(session, indexStats);
using (var indexStatsReduce = new Table(session, dbid, "indexes_stats_reduce", OpenTableGrbit.None))
IndexesStatsReduceColumns = Api.GetColumnDictionary(session, indexStatsReduce);

using (var mappedResults = new Table(session, dbid, "mapped_results", OpenTableGrbit.None))
using (var indexEtags = new Table(session, dbid, "indexes_etag", OpenTableGrbit.None))
IndexesEtagsColumns = Api.GetColumnDictionary(session, indexEtags);
using (var mappedResults = new Table(session, dbid, "mapped_results", OpenTableGrbit.None))
MappedResultsColumns = Api.GetColumnDictionary(session, mappedResults);
using (
var documentsModifiedByTransactions = new Table(session, dbid, "documents_modified_by_transaction",
Expand Down
15 changes: 14 additions & 1 deletion Raven.Storage.Managed/IndexingStorageActions.cs
Expand Up @@ -108,6 +108,8 @@ public IEnumerable<IndexStats> GetIndexesStats()
continue;
yield return new IndexStats
{
TouchCount = readResult.Key.Value<int>("touches"),

IndexingAttempts = readResult.Key.Value<int>("attempts"),
IndexingErrors = readResult.Key.Value<int>("failures"),
IndexingSuccesses = readResult.Key.Value<int>("successes"),
Expand Down Expand Up @@ -137,6 +139,7 @@ public void AddIndex(string name, bool createMapReduce)
{"attempts", 0},
{"successes", 0},
{"failures", 0},
{"touches", 0},
{"lastEtag", Guid.Empty.ToByteArray()},
{"lastTimestamp", DateTime.MinValue},

Expand Down Expand Up @@ -171,7 +174,17 @@ public IndexFailureInformation GetFailureRate(string index)
return indexFailureInformation;
}

public void UpdateLastIndexed(string index, Guid etag, DateTime timestamp)
public void TouchIndexEtag(string index)
{
var readResult = storage.IndexingStats.Read(index);
if (readResult == null)
throw new ArgumentException("There is no index with the name: " + currentIndex.Value);
var key = (RavenJObject)readResult.Key;
key["touches"] = key.Value<int>("touches") + 1;
storage.IndexingStats.UpdateKey(key);
}

public void UpdateLastIndexed(string index, Guid etag, DateTime timestamp)
{
var readResult = storage.IndexingStats.Read(index);
if (readResult == null)
Expand Down

0 comments on commit 0b16e1e

Please sign in to comment.