Skip to content

Commit

Permalink
Merge pull request #6 from dasatomic/btrees
Browse files Browse the repository at this point in the history
Btrees as new collection
  • Loading branch information
dasatomic committed Dec 1, 2021
2 parents 12666e4 + a13ff3e commit 34cb754
Show file tree
Hide file tree
Showing 18 changed files with 1,057 additions and 45 deletions.
494 changes: 494 additions & 0 deletions DataStructures/BTreeCollection.cs

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions DataStructures/Exceptions/KeyAlreadyExists.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;

namespace DataStructures.Exceptions
{
public class KeyAlreadyExists : Exception
{
}

public class KeyNotFound : Exception
{

}
}
11 changes: 10 additions & 1 deletion DataStructures/PageListCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ public interface IPageCollection<T>
Task<U> Max<U>(Func<T, U> projector, U startMin, ITransaction tran) where U : IComparable;
Task<bool> IsEmpty(ITransaction tran);
IAsyncEnumerable<T> Iterate(ITransaction tran);
public ColumnInfo[] GetColumnTypes();
ColumnInfo[] GetColumnTypes();
bool SupportsSeek();
IAsyncEnumerable<T> Seek<K>(K seekVal, ITransaction tran) where K : unmanaged, IComparable<K>;
}

public class PageListCollection : IPageCollection<RowHolder>
Expand Down Expand Up @@ -169,5 +171,12 @@ public async Task<bool> IsEmpty(ITransaction tran)
}

public ColumnInfo[] GetColumnTypes() => this.columnTypes;

public bool SupportsSeek() => false;

public IAsyncEnumerable<RowHolder> Seek<K>(K seekVal, ITransaction tran) where K : unmanaged, IComparable<K>
{
throw new NotImplementedException();
}
}
}
8 changes: 8 additions & 0 deletions LockManager/LockImplementation/Releaser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ internal Releaser(AsyncReadWriterLock toRelease, bool writer, int lockId, ulong
this.isDisposed = false;
}

public static Releaser FakeReleaser
{
get
{
return new Releaser(null, false, 0, 0);
}
}

internal bool IsWriter() => writer;

public void SetReleaseCallback(Action callback)
Expand Down
6 changes: 1 addition & 5 deletions PageManager/PageManagerConstants.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace PageManager
namespace PageManager
{
public static class PageManagerConstants
{
Expand Down
12 changes: 10 additions & 2 deletions PageManager/PagePointerOffsetPair.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Runtime.InteropServices;
using System;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;

namespace PageManager
{
[StructLayout(LayoutKind.Sequential)]
public struct PagePointerOffsetPair
public struct PagePointerOffsetPair : IComparable<PagePointerOffsetPair>
{
public const uint Size = sizeof(long) + sizeof(int);

Expand All @@ -15,5 +17,11 @@ public PagePointerOffsetPair(long pageId, int offsetInPage)
this.PageId = pageId;
this.OffsetInPage = offsetInPage;
}

public int CompareTo([AllowNull] PagePointerOffsetPair other)
{
// For now not supported.
throw new NotImplementedException();
}
}
}
2 changes: 2 additions & 0 deletions PageManager/PageTypes/IPageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void ResetDirty()

public abstract void Update(ST item, ushort position, ITransaction transaction);
public abstract int Insert(ST item, ITransaction transaction);
public abstract int InsertOrdered(ST item, ITransaction transaction, ColumnInfo[] columnTypes, Func<RowHolder, RowHolder, int> comparer);
public abstract void At(ushort position, ITransaction tran, ref ST item);
public abstract void SplitPage(MixedPage newPage, ref RowHolder splitValue, int elemNumForSplit, ITransaction transaction);

public ulong GetBufferPoolToken() => this.bufferPoolToken;

Expand Down
53 changes: 53 additions & 0 deletions PageManager/PageTypes/MixedPageType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public override IEnumerable<RowHolder> Fetch(ITransaction tran)
}
}

public IEnumerable<RowHolder> FetchReverse(ITransaction tran)
{
tran.VerifyLock(this.pageId, LockManager.LockTypeEnum.Shared);

lock (this.lockObject)
{
return this.items.IterateReverse(this.columnTypes);
}
}

public override int Insert(RowHolder item, ITransaction transaction)
{
transaction.VerifyLock(this.pageId, LockManager.LockTypeEnum.Exclusive);
Expand All @@ -113,6 +123,49 @@ public override int Insert(RowHolder item, ITransaction transaction)
}
}

public override int InsertOrdered(RowHolder item, ITransaction transaction, ColumnInfo[] columnTypes, Func<RowHolder, RowHolder, int> comparer)
{
transaction.VerifyLock(this.pageId, LockManager.LockTypeEnum.Exclusive);

lock (this.lockObject)
{
int position = this.items.InsertRowOrdered(item, columnTypes, comparer);

if (position == -1)
{
return position;
}

this.rowCount++;

// TODO: Ordered insert may result in shift operations.
// We need to log that as well to keep things consistent.
ILogRecord rc = new InsertRowRecord(this.pageId, (ushort)(position), item.Storage, transaction.TranscationId(), this.columnTypes, this.PageType());
transaction.AddRecord(rc);

this.isDirty = true;

return position;
}
}

public override void SplitPage(MixedPage newPage, ref RowHolder splitValue, int elemNumForSplit, ITransaction transaction)
{
transaction.VerifyLock(this.pageId, LockManager.LockTypeEnum.Exclusive);
transaction.VerifyLock(newPage.pageId, LockManager.LockTypeEnum.Exclusive);

lock (this.lockObject)
{
this.items.SplitPage(newPage.inMemoryStorage, ref splitValue, elemNumForSplit);
}

this.isDirty = true;
this.rowCount = (uint)this.items.GetRowCount();
newPage.items.UpdateRowCount();
newPage.rowCount = (uint)newPage.items.GetRowCount();
newPage.isDirty = true;
}

public override uint MaxRowCount() => this.items.MaxRowCount();

public override bool CanFit(RowHolder item, ITransaction transaction)
Expand Down
10 changes: 10 additions & 0 deletions PageManager/PageTypes/StringOnlyPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,15 @@ public override void At(ushort position, ITransaction tran, ref char[] item)
{
throw new NotImplementedException();
}

public override int InsertOrdered(char[] item, ITransaction transaction, ColumnInfo[] columnTypes, Func<RowHolder, RowHolder, int> comparer)
{
throw new NotImplementedException();
}

public override void SplitPage(MixedPage newPage, ref RowHolder splitValue, int elemNumForSplit, ITransaction transaction)
{
throw new NotImplementedException();
}
}
}
2 changes: 1 addition & 1 deletion PageManager/RowHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void Fill(Span<byte> arr)
arr.CopyTo(this.Storage);
}

public T GetField<T>(int col) where T : unmanaged
public T GetField<T>(int col) where T : unmanaged, IComparable<T>
{
fixed (byte* ptr = this.Storage)
{
Expand Down
91 changes: 74 additions & 17 deletions PageManager/RowsetHolder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using PageManager.UtilStructures;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

Expand Down Expand Up @@ -81,6 +82,11 @@ public RowsetHolder(ColumnInfo[] columnTypes, Memory<byte> storage, bool init)
maxRowCount = (ushort)((storage.Length - dataStartPosition) / rowSize);
}

public void UpdateRowCount()
{
this.rowCount = BitArray.CountSet(storage.Span.Slice(0, this.reservedPresenceBitmaskCount));
}

public T GetRowGeneric<T>(int row, int col) where T : unmanaged
{
System.Diagnostics.Debug.Assert(IsPresent(row));
Expand Down Expand Up @@ -142,6 +148,8 @@ public int InsertRowOrdered(RowHolder rowHolderToInsert, ColumnInfo[] columnType
// find the first element that is bigger than one to insert.
// TODO: this can be logn.
int positionToInsert = -1;
bool insertAtEnd = false;

for (int i = 0; i < this.maxRowCount; i++)
{
if (BitArray.IsSet(i, this.storage.Span))
Expand All @@ -150,17 +158,24 @@ public int InsertRowOrdered(RowHolder rowHolderToInsert, ColumnInfo[] columnType
GetRow(i, ref rowHolder);
if (comparer(rowHolderToInsert, rowHolder) != 1)
{
// I am smaller than you, I should be at your place.
// I am bigger than you, I should be at your place.
positionToInsert = i;
break;
}
}
else
}

if (positionToInsert == -1)
{
// either I am bigger than everyone or this is an empty collection.
if (this.rowCount == 0)
{
if (positionToInsert == -1)
{
positionToInsert = i;
}
positionToInsert = 0;
}
else
{
insertAtEnd = true;
positionToInsert = this.maxRowCount - 1;
}
}

Expand Down Expand Up @@ -193,6 +208,7 @@ public int InsertRowOrdered(RowHolder rowHolderToInsert, ColumnInfo[] columnType
if (firstFreeElement == -1)
{
// No free space.
Debug.Assert(false, "No free space");
return -1;
}
}
Expand All @@ -201,22 +217,36 @@ public int InsertRowOrdered(RowHolder rowHolderToInsert, ColumnInfo[] columnType

if (positionToInsert < firstFreeElement)
{
// shift right.
// shift right one element.
ByteSliceOperations.ShiftSlice<byte>(
this.storage,
this.dataStartPosition + positionToInsert * this.rowSize, // Source.
this.dataStartPosition + positionToInsert * this.rowSize + this.rowSize, // Destination.
this.dataStartPosition + (positionToInsert + 1) * this.rowSize, // Destination.
numOfElemToCopy * this.rowSize);
}
else
{
// shift left.
ByteSliceOperations.ShiftSlice<byte>(
this.storage,
this.dataStartPosition + positionToInsert * this.rowSize, // Source.
this.dataStartPosition + firstFreeElement * this.rowSize - this.rowSize, // Destination.
numOfElemToCopy * this.rowSize);
if (insertAtEnd)
{
// shift left. We are at the end.
ByteSliceOperations.ShiftSlice<byte>(
this.storage,
this.dataStartPosition + (firstFreeElement + 1) * this.rowSize, // Source.
this.dataStartPosition + firstFreeElement * this.rowSize, // Destination.
numOfElemToCopy * this.rowSize);
}
else
{
// shift prev elements to the left.
ByteSliceOperations.ShiftSlice<byte>(
this.storage,
this.dataStartPosition + (firstFreeElement + 1) * this.rowSize, // Source.
this.dataStartPosition + firstFreeElement * this.rowSize, // Destination.
(numOfElemToCopy - 1) * this.rowSize);
positionToInsert--;
}
}

BitArray.Set(firstFreeElement, this.storage.Span);
}

Expand All @@ -240,11 +270,24 @@ public void SplitPage(Memory<byte> newPage, ref RowHolder splitValue, int elemNu
{
if (this.rowCount % 2 != 1)
{
throw new ArgumentException("Page needs to have uneven number of elements to make the split.");
throw new ArgumentException("Page needs to have odd number of elements to make the split.");
}

this.storage.CopyTo(newPage);

if (!BitArray.IsSet(elemNumForSplit, this.storage.Span))
{
for (int i = elemNumForSplit + 1; i < this.maxRowCount; i++)
{
// try to find optimal element.
if (BitArray.IsSet(i, this.storage.Span))
{
elemNumForSplit = i;
break;
}
}
}

for (int i = 0; i < elemNumForSplit + 1; i++)
{
// Unset presence in the first half of new page.
Expand All @@ -253,13 +296,14 @@ public void SplitPage(Memory<byte> newPage, ref RowHolder splitValue, int elemNu

this.GetRow(elemNumForSplit, ref splitValue);

// Caller will have to refresh row count in this item with UpdateRowCount call.
// we can't do it here since we are operating on row memory.
for (int i = elemNumForSplit; i < this.maxRowCount; i++)
{
// Unset presence in the second half of this page.
BitArray.Unset(i, this.storage.Span);
}

this.rowCount /= 2;
this.UpdateRowCount();
}

// TODO: This is not performant and it is not natural to pass column type here.
Expand All @@ -276,6 +320,19 @@ public IEnumerable<RowHolder> Iterate(ColumnInfo[] columnTypes)
}
}

public IEnumerable<RowHolder> IterateReverse(ColumnInfo[] columnTypes)
{
for (int i = this.maxRowCount - 1; i >= 0; i--)
{
if (BitArray.IsSet(i, this.storage.Span))
{
RowHolder rowHolder = new RowHolder(columnTypes);
GetRow(i, ref rowHolder);
yield return rowHolder;
}
}
}

public ushort MaxRowCount() => this.maxRowCount;

public int FreeSpaceForItems()
Expand Down
2 changes: 1 addition & 1 deletion QueryProcessing/Builders/AstToOpTreeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public RowProvider(IAsyncEnumerable<RowHolder> enumerator, MetadataColumn[] colu
public IAsyncEnumerable<RowHolder> Enumerator { get; }
public MetadataColumn[] ColumnInfo { get; }

public T GetValue<T>(RowHolder rh, string columnName) where T : unmanaged
public T GetValue<T>(RowHolder rh, string columnName) where T : unmanaged, IComparable<T>
{
if (columnPositions.TryGetValue(columnName, out int position))
{
Expand Down
Loading

0 comments on commit 34cb754

Please sign in to comment.