Permalink
Browse files

No commit message

  • Loading branch information...
hhblaze committed Sep 26, 2017
1 parent 2d8fbac commit 5cf1c396d05e91c1ac83248c872559102202deeb
@@ -83,8 +83,9 @@ internal Transaction(int transactionType, TransactionUnit transactionUnit, eTran
{
if (_transactionUnit.TransactionsCoordinator.GetSchema.Engine._transactionTablesLocker.AddSession(lockType, tables))
break;
}
break;
break;
}
this.RandomKeySorter._t = this;
@@ -35,7 +35,7 @@ class internSession
{
public string[] tables;
public eTransactionTablesLockTypes lockType= eTransactionTablesLockTypes.EXCLUSIVE;
public DbThreadsGator gator = null;
public DbThreadsGator gator = null;
}
/// <summary>
@@ -44,8 +44,9 @@ class internSession
/// <param name="lockType"></param>
/// <param name="tables"></param>
/// <returns>false if thread grants access, false if thread is in a queue</returns>
public bool AddSession(eTransactionTablesLockTypes lockType, string[] tables)
{
{
lock (lock_disposed)
{
if (disposed)
@@ -140,8 +141,10 @@ public bool AddSession(eTransactionTablesLockTypes lockType, string[] tables)
if (!ret)
{
//putting gate
iSession.gator.PutGateHere();
}
return ret;
@@ -337,7 +337,7 @@ public List<Diagnostic.ActiveTransactionState> Diagnostic_GetActiveTransactionsS
/// <param name="transactionThreadId"></param>
/// <param name="tablesNames"></param>
/// <param name="calledBySynchronizer"></param>
public void RegisterWriteTablesForTransaction(int transactionThreadId, List<string> tablesNames,bool calledBySynchronizer)
public void RegisterWriteTablesForTransaction(int transactionThreadId, List<string> tablesNames, bool calledBySynchronizer)
{
//in every transaction unit we got a list of reserved for WRITING tables
@@ -466,15 +466,23 @@ public void RegisterWriteTablesForTransaction(int transactionThreadId, List<stri
//blocking thread which requires busy tables for writing, till they are released
//ThreadsGator.PutGateHere(20000); //every 20 second (or by Gate open we give a chance to re-try, for safety reasons of hanged threads, if programmer didn't dispose DBreeze process after the programm end)
//#if ASYNC
// await ThreadsGator.PutGateHere().ConfigureAwait(false);
//#else
// ThreadsGator.PutGateHere();
//#endif
ThreadsGator.PutGateHere();
//mreWriteTransactionLock.WaitOne();
}
}//eo while
}
#endregion //Eliminating Deadlocks. Registering tables for write before starting transaction operations
#endregion //Eliminating Deadlocks. Registering tables for write before starting transaction operations
/// <summary>
View
@@ -560,6 +560,123 @@ public static void Decode_DICT_PROTO_UINT_BYTEARRAY(this byte[] encB, IDictionar
return;
}
/// <summary>
/// null elements equal to new byte[0]
/// </summary>
/// <param name="d"></param>
/// <returns></returns>
public static byte[] Encode_PROTO_ListByteArray(IList<byte[]> d)
{
//null element of the list is not allowed (the same as in protobuf), byte[0] means String.Empty
if (d == null || d.Count == 0)
return new byte[0];
byte[] tar1 = null;
using (MemoryStream ms = new MemoryStream())
{
foreach (var el in d)
{
//Setting key
ms.Write(new byte[] { 10 }, 0, 1); //complex Type - 10
if (el == null || el.Length == 0)
ms.Write(new byte[] { 0 }, 0, 1);// 0 length element
else
{
tar1 = DBreeze.Utils.Biser.GetVarintBytes((uint)el.Length);
ms.Write(tar1, 0, tar1.Length);//length of key
ms.Write(el, 0, el.Length);//key self
}
}
tar1 = ms.ToArray();
ms.Close();
}
return tar1;
}
/// <summary>
/// null elements equal to new byte[0]
/// </summary>
/// <param name="encB"></param>
/// <returns></returns>
public static List<byte[]> Decode_PROTO_ListByteArray(byte[] encB)
{
//null element of the list is not allowed (the same as in protobuf), byte[0] means String.Empty
if (encB == null)
return null;
List<byte[]> ret = new List<byte[]>();
if (encB.Length == 0)
return ret;
List<byte[]> ar = new List<byte[]>();
int i = 0;
int mode = 0;
byte[] sizer = new byte[4];
int size = 0;
uint valCnt = 0;
byte el = 0;
byte[] key = new byte[0];
Action ClearSizer = () =>
{
sizer[0] = 0;
sizer[1] = 0;
sizer[2] = 0;
sizer[3] = 0;
size = 0;
};
while (i < encB.Length)
{
el = encB[i];
switch (mode)
{
case 0:
//Always delimiter 10
mode = 1;
break;
case 1:
//Reading length of the next text
if ((el & 0x80) > 0)
{
sizer[size] = el;
size++;
}
else
{
mode = 0;
sizer[size] = el;
size++;
valCnt = DBreeze.Utils.Biser.ToUInt32(sizer);
ClearSizer();
if (valCnt > 0)
{
key = encB.Substring(i + 1, (int)valCnt);
i += (int)valCnt;
ret.Add(key);
}
else
ret.Add(new byte[0]);
}
break;
}
i++;
}
return ret;
}
/// <summary>
/// Uses protobuf concepts
/// </summary>
@@ -8,12 +8,93 @@
using System.Linq;
using System.Text;
#if ASYNC
using System.Threading.Tasks;
using System.Threading;
#endif
namespace DBreeze.Utils
{
public class DbThreadsGator:IDisposable
{
System.Threading.ManualResetEvent gate = null;
#if ASYNC
public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return _tcs.Task; }
public void Set() { _tcs.TrySetResult(true); }
public void Reset()
{
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
AsyncManualResetEvent gate = null;
/// <summary>
/// Creates open Gate
/// </summary>
public DbThreadsGator()
{
gate = new AsyncManualResetEvent();
gate.Set();
}
public DbThreadsGator(bool gateIsOpen)
{
gate = new AsyncManualResetEvent();
if (gateIsOpen)
gate.Set();
else
gate.Reset();
}
/// <summary>
/// Sets Gate in the code
/// </summary>
/// <returns></returns>
public async Task PutGateHere()
{
await gate.WaitAsync();
}
/// <summary>
/// If gate is closed then it will be closed timeout time in milliseconds
/// </summary>
/// <param name="milliseconds"></param>
/// <returns></returns>
public async Task PutGateHere(int milliseconds)
{
await Task.WhenAny(gate.WaitAsync(), Task.Delay(milliseconds));
}
public void OpenGate()
{
gate.Set();
}
public void CloseGate()
{
gate.Reset();
}
public void Dispose()
{
gate.Set();
gate = null;
}
#else
System.Threading.ManualResetEvent gate = null;
/// <summary>
/// Creates open Gate
@@ -53,7 +134,6 @@ public bool PutGateHere(int milliseconds)
/// <summary>
/// If gate is closed then it will be closed timeout time in milliseconds
/// </summary>
/// <param name="milliseconds"></param>
/// <param name="milliseconds">exitContext</param>
/// <returns></returns>
public bool PutGateHere(int milliseconds, bool exitContext)
@@ -75,5 +155,8 @@ public void Dispose()
{
gate.Close();
}
#endif
}
}

0 comments on commit 5cf1c39

Please sign in to comment.