From 34ecb0e66970c78ad0664ffa48faedafde52ce6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E6=9D=BE=E6=9D=B0?= <377973147@qq.com> Date: Fri, 19 Jan 2024 15:25:40 +0800 Subject: [PATCH 1/4] =?UTF-8?q?mongodb=20busiCall=20=E4=B8=AD=E5=85=81?= =?UTF-8?q?=E8=AE=B8=E6=B7=BB=E5=8A=A0=E8=BF=94=E5=9B=9E=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=8F=90=E4=BA=A4=E4=BA=8B=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/DtmMongoBarrier/MongoBranchBarrier.cs | 72 +++++++++++++++++++---- 1 file changed, 62 insertions(+), 10 deletions(-) diff --git a/src/DtmMongoBarrier/MongoBranchBarrier.cs b/src/DtmMongoBarrier/MongoBranchBarrier.cs index 57f78c7..98bb03f 100644 --- a/src/DtmMongoBarrier/MongoBranchBarrier.cs +++ b/src/DtmMongoBarrier/MongoBranchBarrier.cs @@ -41,7 +41,7 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< if (isNullCompensation || isDuplicateOrPend) { - bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); + bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); await session.CommitTransactionAsync(); return; } @@ -60,19 +60,71 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< } } + public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func> busiCall) + { + bb.BarrierID = bb.BarrierID + 1; + var bid = bb.BarrierID.ToString().PadLeft(2, '0'); + + var session = await mc.StartSessionAsync(); + + session.StartTransaction(); + + try + { + var originOp = Constant.Barrier.OpDict.TryGetValue(bb.Op, out var ot) ? ot : string.Empty; + + var (originAffected, oEx) = await MongoInsertBarrier(bb, session, bb.BranchID, originOp, bid, bb.Op); + var (currentAffected, rEx) = await MongoInsertBarrier(bb, session, bb.BranchID, bb.Op, bid, bb.Op); + + bb?.Logger?.LogDebug("mongo originAffected: {originAffected} currentAffected: {currentAffected}", originAffected, currentAffected); + + if (bb.IsMsgRejected(rEx?.Message, bb.Op, currentAffected)) + throw new DtmDuplicatedException(); + + if (oEx != null || rEx != null) + { + throw oEx ?? rEx; + } + + var isNullCompensation = bb.IsNullCompensation(bb.Op, originAffected); + var isDuplicateOrPend = bb.IsDuplicateOrPend(currentAffected); + + if (isNullCompensation || isDuplicateOrPend) + { + bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); + await session.CommitTransactionAsync(); + return; + } + + var autoCommit = await busiCall.Invoke(session); + if (autoCommit) + { + await session.CommitTransactionAsync(); + } + } + catch (Exception ex) + { + bb?.Logger?.LogError(ex, "Mongo Call error, gid={gid}, trans_type={trans_type}", bb.Gid, bb.TransType); + + await session.AbortTransactionAsync(); + + throw; + } + } + public static async Task MongoQueryPrepared(this BranchBarrier bb, IMongoClient mc) { var session = await mc.StartSessionAsync(); try { - await MongoInsertBarrier( - bb, - session, - Constant.Barrier.MSG_BRANCHID, - Constant.TYPE_MSG, - Constant.Barrier.MSG_BARRIER_ID, - Constant.Barrier.MSG_BARRIER_REASON); + await MongoInsertBarrier( + bb, + session, + Constant.Barrier.MSG_BRANCHID, + Constant.TYPE_MSG, + Constant.Barrier.MSG_BARRIER_ID, + Constant.Barrier.MSG_BARRIER_REASON); } catch (Exception ex) { @@ -81,7 +133,7 @@ await MongoInsertBarrier( } var reason = string.Empty; - + try { var barrier = session.Client.GetDatabase(bb.DtmOptions.BarrierMongoDbName) @@ -155,7 +207,7 @@ await barrier.InsertOneAsync(new DtmBarrierDocument } private static FilterDefinition BuildFilters(string gid, string branchId, string op, string barrierId) - { + { return new FilterDefinitionBuilder().And( Builders.Filter.Eq(x => x.GId, gid), Builders.Filter.Eq(x => x.BranchId, branchId), From 6a5181b9d293aed37cd7fca556fc94f723fa53ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E6=9D=BE=E6=9D=B0?= <377973147@qq.com> Date: Wed, 24 Jan 2024 16:51:35 +0800 Subject: [PATCH 2/4] Extract duplicate code logic to MongoCallCore --- src/DtmMongoBarrier/MongoBranchBarrier.cs | 57 +++++------------------ 1 file changed, 12 insertions(+), 45 deletions(-) diff --git a/src/DtmMongoBarrier/MongoBranchBarrier.cs b/src/DtmMongoBarrier/MongoBranchBarrier.cs index 98bb03f..b7a7a0b 100644 --- a/src/DtmMongoBarrier/MongoBranchBarrier.cs +++ b/src/DtmMongoBarrier/MongoBranchBarrier.cs @@ -12,55 +12,23 @@ public static class MongoBranchBarrier { public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func busiCall) { - bb.BarrierID = bb.BarrierID + 1; - var bid = bb.BarrierID.ToString().PadLeft(2, '0'); - - var session = await mc.StartSessionAsync(); - - session.StartTransaction(); - - try + Func> innerCall = async session => { - var originOp = Constant.Barrier.OpDict.TryGetValue(bb.Op, out var ot) ? ot : string.Empty; - - var (originAffected, oEx) = await MongoInsertBarrier(bb, session, bb.BranchID, originOp, bid, bb.Op); - var (currentAffected, rEx) = await MongoInsertBarrier(bb, session, bb.BranchID, bb.Op, bid, bb.Op); - - bb?.Logger?.LogDebug("mongo originAffected: {originAffected} currentAffected: {currentAffected}", originAffected, currentAffected); - - if (bb.IsMsgRejected(rEx?.Message, bb.Op, currentAffected)) - throw new DtmDuplicatedException(); - - if (oEx != null || rEx != null) - { - throw oEx ?? rEx; - } - - var isNullCompensation = bb.IsNullCompensation(bb.Op, originAffected); - var isDuplicateOrPend = bb.IsDuplicateOrPend(currentAffected); - - if (isNullCompensation || isDuplicateOrPend) - { - bb?.Logger?.LogInformation("mongo Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); - await session.CommitTransactionAsync(); - return; - } - await busiCall.Invoke(session); - await session.CommitTransactionAsync(); - } - catch (Exception ex) - { - bb?.Logger?.LogError(ex, "Mongo Call error, gid={gid}, trans_type={trans_type}", bb.Gid, bb.TransType); - - await session.AbortTransactionAsync(); - - throw; - } + return true; + }; + await MongoCallCore(bb, mc, innerCall); } public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func> busiCall) + { + Func> innerCall = async session => + await busiCall.Invoke(session); + + await MongoCallCore(bb, mc, innerCall); + } + private static async Task MongoCallCore(this BranchBarrier bb, IMongoClient mc, Func> innerCall) { bb.BarrierID = bb.BarrierID + 1; var bid = bb.BarrierID.ToString().PadLeft(2, '0'); @@ -95,8 +63,8 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< await session.CommitTransactionAsync(); return; } + var autoCommit = await innerCall.Invoke(session); - var autoCommit = await busiCall.Invoke(session); if (autoCommit) { await session.CommitTransactionAsync(); @@ -111,7 +79,6 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< throw; } } - public static async Task MongoQueryPrepared(this BranchBarrier bb, IMongoClient mc) { var session = await mc.StartSessionAsync(); From 3d26defddda748fb779700a105f51bba5d1902e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E6=9D=BE=E6=9D=B0?= <377973147@qq.com> Date: Wed, 24 Jan 2024 20:12:59 +0800 Subject: [PATCH 3/4] Optimize code --- src/DtmMongoBarrier/MongoBranchBarrier.cs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/DtmMongoBarrier/MongoBranchBarrier.cs b/src/DtmMongoBarrier/MongoBranchBarrier.cs index b7a7a0b..fb60c3c 100644 --- a/src/DtmMongoBarrier/MongoBranchBarrier.cs +++ b/src/DtmMongoBarrier/MongoBranchBarrier.cs @@ -18,17 +18,10 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< return true; }; - await MongoCallCore(bb, mc, innerCall); + await MongoCall(bb, mc, innerCall); } public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func> busiCall) - { - Func> innerCall = async session => - await busiCall.Invoke(session); - - await MongoCallCore(bb, mc, innerCall); - } - private static async Task MongoCallCore(this BranchBarrier bb, IMongoClient mc, Func> innerCall) { bb.BarrierID = bb.BarrierID + 1; var bid = bb.BarrierID.ToString().PadLeft(2, '0'); From b109d13888467e1fce861f3e04c4292ebb12782d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E6=9D=BE=E6=9D=B0?= <377973147@qq.com> Date: Thu, 25 Jan 2024 08:47:44 +0800 Subject: [PATCH 4/4] fix busiCall --- src/DtmMongoBarrier/MongoBranchBarrier.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DtmMongoBarrier/MongoBranchBarrier.cs b/src/DtmMongoBarrier/MongoBranchBarrier.cs index fb60c3c..c7926d1 100644 --- a/src/DtmMongoBarrier/MongoBranchBarrier.cs +++ b/src/DtmMongoBarrier/MongoBranchBarrier.cs @@ -56,7 +56,7 @@ public static async Task MongoCall(this BranchBarrier bb, IMongoClient mc, Func< await session.CommitTransactionAsync(); return; } - var autoCommit = await innerCall.Invoke(session); + var autoCommit = await busiCall.Invoke(session); if (autoCommit) {