-
Notifications
You must be signed in to change notification settings - Fork 7
/
client.rs
117 lines (102 loc) · 3.84 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use {
crate::{
providers::ProviderKind,
stores::{self, StoreError::NotFound},
},
async_trait::async_trait,
sqlx::Executor,
tracing::{info, instrument},
};
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct Client {
pub tenant_id: String,
pub push_type: ProviderKind,
#[sqlx(rename = "device_token")]
pub token: String,
pub always_raw: bool,
}
#[async_trait]
pub trait ClientStore {
async fn create_client(&self, tenant_id: &str, id: &str, client: Client) -> stores::Result<()>;
async fn get_client(&self, tenant_id: &str, id: &str) -> stores::Result<Client>;
async fn delete_client(&self, tenant_id: &str, id: &str) -> stores::Result<()>;
}
#[async_trait]
impl ClientStore for sqlx::PgPool {
#[instrument(skip(self, client))]
async fn create_client(&self, tenant_id: &str, id: &str, client: Client) -> stores::Result<()> {
info!(
"ClientStore::create_client tenant_id={tenant_id} id={id} token={} with locking",
client.token
);
let mut transaction = self.begin().await?;
// Statement for locking to prevent an issue #230
sqlx::query("SELECT pg_advisory_xact_lock(abs(hashtext($1::text)))")
.bind(id)
.execute(&mut transaction)
.await?;
sqlx::query("DELETE FROM public.clients WHERE id = $1 OR device_token = $2")
.bind(id)
.bind(client.token.clone())
.execute(&mut transaction)
.await?;
let mut insert_query = sqlx::QueryBuilder::new(
"INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)",
);
insert_query.push_values(
vec![(
id,
tenant_id,
client.push_type,
client.token,
client.always_raw,
)],
|mut b, client| {
b.push_bind(client.0)
.push_bind(client.1)
.push_bind(client.2)
.push_bind(client.3)
.push_bind(client.4);
},
);
insert_query.build().execute(&mut transaction).await?;
transaction.commit().await?;
Ok(())
}
#[instrument(skip(self))]
async fn get_client(&self, tenant_id: &str, id: &str) -> stores::Result<Client> {
let res = sqlx::query_as::<sqlx::postgres::Postgres, Client>(
"SELECT tenant_id, push_type, device_token, always_raw FROM public.clients WHERE id = \
$1 and tenant_id = $2",
)
.bind(id)
.bind(tenant_id)
.fetch_one(self)
.await;
match res {
Err(sqlx::Error::RowNotFound) => Err(NotFound("client".to_string(), id.to_string())),
Err(e) => Err(e.into()),
Ok(row) => Ok(row),
}
}
#[instrument(skip(self))]
async fn delete_client(&self, tenant_id: &str, id: &str) -> stores::Result<()> {
info!("ClientStore::delete_client tenant_id={tenant_id} id={id}");
let mut notification_query_builder =
sqlx::QueryBuilder::new("DELETE FROM public.notifications WHERE client_id = ");
notification_query_builder.push_bind(id);
notification_query_builder.push(" and tenant_id = ");
notification_query_builder.push_bind(tenant_id);
let notification_query = notification_query_builder.build();
self.execute(notification_query).await?;
let mut query_builder = sqlx::QueryBuilder::new("DELETE FROM public.clients WHERE id = ");
query_builder.push_bind(id);
query_builder.push(" and tenant_id = ");
query_builder.push_bind(tenant_id);
let query = query_builder.build();
match self.execute(query).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}