Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions src/DtmMongoBarrier/MongoBranchBarrier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ namespace DtmMongoBarrier
public static class MongoBranchBarrier
{
public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func<IClientSessionHandle, Task> busiCall)
{
Func<IClientSessionHandle, Task<bool>> innerCall = async session =>
{
await busiCall.Invoke(session);

return true;
};
await MongoCall(bb, mc, innerCall);
}

public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func<IClientSessionHandle, Task<bool>> busiCall)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两个方法代码重合度很高,最好做一个抽取

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两个方法代码重合度很高,最好做一个抽取

好的,我重新提交了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
bb.BarrierID = bb.BarrierID + 1;
var bid = bb.BarrierID.ToString().PadLeft(2, '0');
Expand Down Expand Up @@ -41,14 +52,16 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func<

if (isNullCompensation || isDuplicateOrPend)
{
bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend);
bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend);
await session.CommitTransactionAsync();
return;
}
var autoCommit = await busiCall.Invoke(session);

await busiCall.Invoke(session);

await session.CommitTransactionAsync();
if (autoCommit)
{
await session.CommitTransactionAsync();
}
}
catch (Exception ex)
{
Expand All @@ -59,20 +72,19 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func<
throw;
}
}

public static async Task<string> MongoQueryPrepared(this BranchBarrier bb, IMongoClient mc)
{
var session = await mc.StartSessionAsync();

try
{
await MongoInsertBarrier(
bb,
session,
Constant.Barrier.MSG_BRANCHID,
Constant.TYPE_MSG,
Constant.Barrier.MSG_BARRIER_ID,
Constant.Barrier.MSG_BARRIER_REASON);
await MongoInsertBarrier(
bb,
session,
Constant.Barrier.MSG_BRANCHID,
Constant.TYPE_MSG,
Constant.Barrier.MSG_BARRIER_ID,
Constant.Barrier.MSG_BARRIER_REASON);
}
catch (Exception ex)
{
Expand All @@ -81,7 +93,7 @@ await MongoInsertBarrier(
}

var reason = string.Empty;

try
{
var barrier = session.Client.GetDatabase(bb.DtmOptions.BarrierMongoDbName)
Expand Down Expand Up @@ -155,7 +167,7 @@ await barrier.InsertOneAsync(new DtmBarrierDocument
}

private static FilterDefinition<DtmBarrierDocument> BuildFilters(string gid, string branchId, string op, string barrierId)
{
{
return new FilterDefinitionBuilder<DtmBarrierDocument>().And(
Builders<DtmBarrierDocument>.Filter.Eq(x => x.GId, gid),
Builders<DtmBarrierDocument>.Filter.Eq(x => x.BranchId, branchId),
Expand Down