-
Notifications
You must be signed in to change notification settings - Fork 175
/
Copy pathcommit_transaction.rs
92 lines (78 loc) · 2.62 KB
/
commit_transaction.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::time::Duration;
use bson::rawdoc;
use crate::{
cmap::{Command, RawCommandResponse, StreamDescription},
error::Result,
operation::{
append_options_to_raw_document,
remove_empty_write_concern,
OperationWithDefaults,
Retryability,
},
options::{Acknowledgment, TransactionOptions, WriteConcern},
};
use super::{ExecutionContext, WriteConcernOnlyBody};
pub(crate) struct CommitTransaction {
options: Option<TransactionOptions>,
}
impl CommitTransaction {
pub(crate) fn new(options: Option<TransactionOptions>) -> Self {
Self { options }
}
}
impl OperationWithDefaults for CommitTransaction {
type O = ();
const NAME: &'static str = "commitTransaction";
fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
let mut body = rawdoc! {
Self::NAME: 1,
};
remove_empty_write_concern!(self.options);
append_options_to_raw_document(&mut body, self.options.as_ref())?;
Ok(Command::new(
Self::NAME.to_string(),
"admin".to_string(),
body,
))
}
fn handle_response<'a>(
&'a self,
response: RawCommandResponse,
_context: ExecutionContext<'a>,
) -> Result<Self::O> {
let response: WriteConcernOnlyBody = response.body()?;
response.validate()
}
fn write_concern(&self) -> Option<&WriteConcern> {
self.options
.as_ref()
.and_then(|opts| opts.write_concern.as_ref())
}
fn retryability(&self) -> Retryability {
Retryability::Write
}
// Updates the write concern to use w: majority and a w_timeout of 10000 if w_timeout is not
// already set. The write concern on a commitTransaction command should be updated if a
// commit is being retried internally or by the user.
fn update_for_retry(&mut self) {
let options = self
.options
.get_or_insert_with(|| TransactionOptions::builder().build());
match &mut options.write_concern {
Some(write_concern) => {
write_concern.w = Some(Acknowledgment::Majority);
if write_concern.w_timeout.is_none() {
write_concern.w_timeout = Some(Duration::from_millis(10000));
}
}
None => {
options.write_concern = Some(
WriteConcern::builder()
.w(Acknowledgment::Majority)
.w_timeout(Duration::from_millis(10000))
.build(),
);
}
}
}
}