Skip to content

Commit

Permalink
Implement subscriptions api
Browse files Browse the repository at this point in the history
  • Loading branch information
gemcoder21 committed Sep 6, 2023
1 parent 99408e7 commit dab1202
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 24 deletions.
18 changes: 4 additions & 14 deletions api/src/device/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ extern crate rocket;
use std::error::Error;

use primitives::platform::Platform;
use storage::DatabaseClient;
use storage::{DatabaseClient, models::UpdateDevice};

pub struct DevicesClient {
database: DatabaseClient,
Expand All @@ -20,13 +20,13 @@ impl DevicesClient {

pub fn add_device(&mut self, device: primitives::device::Device) -> Result<primitives::device::Device, Box<dyn Error>> {
let device_id = device.id.clone();
let add_device = self.map_device(device);
let add_device = UpdateDevice::from_primitive(device);
let _ = self.database.add_device(add_device)?;
return self.get_device(device_id.as_str());
}

pub fn get_device(&mut self, device_id: &str) -> Result<primitives::device::Device, Box<dyn Error>> {
let device = self.database.get_device(device_id.to_string())?;
let device = self.database.get_device(device_id)?;
Ok(
primitives::device::Device {
id: device.device_id,
Expand All @@ -39,18 +39,8 @@ impl DevicesClient {
}
pub fn update_device(&mut self, device: primitives::device::Device) -> Result<primitives::device::Device, Box<dyn Error>> {
let device_id = device.id.clone();
let update_device = self.map_device(device);
let update_device = UpdateDevice::from_primitive(device);
let _ = self.database.update_device(update_device)?;
return self.get_device(device_id.as_str());
}

pub fn map_device(&self, device: primitives::device::Device) -> storage::models::Device {
return storage::models::Device {
device_id: device.id,
platform: device.platform.as_str().to_string(),
token: device.token,
locale: device.locale,
is_push_enabled: device.is_push_enabled,
};
}
}
9 changes: 8 additions & 1 deletion api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod name;
mod charts;
mod device;
mod asset;
mod subscription;

use asset::client::AssetsClient;
use fiat::mercuryo::MercuryoClient;
Expand All @@ -29,6 +30,7 @@ use plausible_client:: Client as PlausibleClient;
use storage::DatabaseClient as DatabaseClient;
use name_resolver::client::Client as NameClient;
use device::client::DevicesClient;
use subscription::client::SubscriptionsClient;
use rocket::tokio::sync::Mutex;
use rocket_prometheus::PrometheusMetrics;

Expand All @@ -51,6 +53,7 @@ async fn rocket(settings: Settings) -> Rocket<Build> {
settings.name.spaceid.url,
);
let devices_client = DevicesClient::new(postgres_url).await;
let subscriptions_client = SubscriptionsClient::new(postgres_url).await;
let assets_client = AssetsClient::new(postgres_url).await;
let plausible_client = PlausibleClient::new(&settings.plausible.url);
let request_client = FiatClient::request_client(settings.fiat.timeout);
Expand Down Expand Up @@ -83,7 +86,8 @@ async fn rocket(settings: Settings) -> Rocket<Build> {
.manage(Mutex::new(plausible_client))
.manage(Mutex::new(name_client))
.manage(Mutex::new(devices_client))
.manage(Mutex::new(assets_client))
.manage(Mutex::new(assets_client))
.manage(Mutex::new(subscriptions_client))
.mount("/", routes![
status::get_status,
])
Expand All @@ -101,6 +105,9 @@ async fn rocket(settings: Settings) -> Rocket<Build> {
device::get_device,
device::update_device,
asset::get_asset,
subscription::add_subscriptions,
subscription::get_subscriptions,
subscription::delete_subscriptions,
])
.mount(settings.metrics.path, prometheus)
}
Expand Down
2 changes: 1 addition & 1 deletion api/src/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn get_name_resolve(
chain: &str,
name_client: &State<Mutex<NameClient>>,
) -> Json<NameRecord> {
let chain = Chain::new(chain).unwrap();
let chain = Chain::from_str(chain).unwrap();
let name = name_client.lock().await.resolve(name, chain).await.unwrap();
Json(name)
}
56 changes: 56 additions & 0 deletions api/src/subscription/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
extern crate rocket;
use std::error::Error;

use primitives::Subscription;
use storage::DatabaseClient;

pub struct SubscriptionsClient {
database: DatabaseClient,
}

impl SubscriptionsClient {
pub async fn new(
database_url: &str
) -> Self {
let database = DatabaseClient::new(database_url);
Self {
database,
}
}

pub fn add_subscriptions(&mut self, device_id: &str, subscriptions: Vec<Subscription>) -> Result<usize, Box<dyn Error>> {
let device = self.database.get_device(device_id)?;
let subscriptions = subscriptions
.into_iter()
.map(|x| storage::models::Subscription::from_primitive(x, device.id))
.collect();
let result = self.database.add_subscriptions(subscriptions)?;

return Ok(result)
}

pub fn get_subscriptions(&mut self, device_id: &str) -> Result<Vec<primitives::Subscription>, Box<dyn Error>> {
let subscriptions = self.database
.get_subscriptions_by_device_id(device_id)?
.into_iter()
.map(|x| x.as_primitive())
.collect();
return Ok(subscriptions)
}

pub fn delete_subscriptions(&mut self, device_id: &str, subscriptions: Vec<Subscription>) -> Result<usize, Box<dyn Error>> {
let device = self.database.get_device(device_id)?;
let values = subscriptions
.into_iter()
.map(|x| storage::models::Subscription::from_primitive(x, device.id))
.collect::<Vec<storage::models::Subscription>>();

//TODO: Implement to delete all subscriptions at once
let mut result = 0;
for subscription in values {
let size = self.database.delete_subscription(subscription)?;
result += size;
}
Ok(result)
}
}
39 changes: 39 additions & 0 deletions api/src/subscription/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
extern crate rocket;
use primitives::Subscription;
use rocket::serde::json::Json;
use self::client::SubscriptionsClient;
use rocket::State;
use rocket::tokio::sync::Mutex;

pub mod client;

#[get("/subscriptions/<device_id>")]
pub async fn get_subscriptions(
device_id: &str,
client: &State<Mutex<SubscriptionsClient>>,
) -> Json<Vec<Subscription>> {
let subscriptions = client.lock().await.get_subscriptions(device_id).unwrap();
Json(subscriptions)
}

#[delete("/subscriptions/<device_id>", format = "json", data = "<subscriptions>")]
pub async fn delete_subscriptions(
subscriptions: Json<Vec<Subscription>>,
device_id: &str,
client: &State<Mutex<SubscriptionsClient>>,
) -> Json<usize> {
let result = client.lock().await.delete_subscriptions(device_id, subscriptions.0).unwrap();
Json(result)
}


#[post("/subscriptions/<device_id>", format = "json", data = "<subscriptions>")]
pub async fn add_subscriptions(
subscriptions: Json<Vec<Subscription>>,
#[allow(unused)]
device_id: &str,
client: &State<Mutex<SubscriptionsClient>>,
) -> Json<usize> {
let subscriptions = client.lock().await.add_subscriptions(device_id, subscriptions.0).unwrap();
Json(subscriptions)
}
2 changes: 1 addition & 1 deletion primitives/src/asset_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl AssetId {
if parts.is_empty() || parts.len() > 2 {
return None;
}
let chain = Chain::new(parts[0])?;
let chain = Chain::from_str(parts[0])?;
let token_id = parts.get(1).map(|s| s.to_owned());
Some(Self { chain, token_id: token_id.map(|s| s.to_owned()) })
}
Expand Down
2 changes: 1 addition & 1 deletion primitives/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl PartialEq for Chain {
}

impl Chain {
pub fn new(chain: &str) -> Option<Self> {
pub fn from_str(chain: &str) -> Option<Self> {
match chain {
"bitcoin" => Some(Self::Bitcoin),
"binance" => Some(Self::Binance),
Expand Down
5 changes: 4 additions & 1 deletion primitives/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// lib.rs

pub mod chain;
pub use self::chain::Chain;
pub mod chain_type;
pub mod name;
pub mod node;
Expand Down Expand Up @@ -30,4 +31,6 @@ pub use self::transaction_type::TransactionType;
pub mod transaction_state;
pub use self::transaction_state::TransactionState;
pub mod transaction_direction;
pub use self::transaction_direction::TransactionDirection;
pub use self::transaction_direction::TransactionDirection;
pub mod subscription;
pub use self::subscription::Subscription;
11 changes: 11 additions & 0 deletions primitives/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use typeshare::typeshare;
use serde::{Serialize, Deserialize};

use crate::chain::Chain;

#[derive(Clone, Debug, Serialize, Deserialize)]
#[typeshare(swift = "Equatable, Codable, Hashable")]
pub struct Subscription {
pub chain: Chain,
pub address: String,
}
34 changes: 31 additions & 3 deletions storage/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use diesel::{Connection, upsert::excluded};
use diesel::pg::PgConnection;
use primitives::chain::Chain;
use crate::models::*;
use crate::schema::devices;
use diesel::prelude::*;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("../storage/src/migrations");
Expand Down Expand Up @@ -195,7 +196,7 @@ impl DatabaseClient {
.execute(&mut self.connection)
}

pub fn add_device(&mut self, device: Device) -> Result<usize, diesel::result::Error> {
pub fn add_device(&mut self, device: UpdateDevice) -> Result<usize, diesel::result::Error> {
use crate::schema::devices::dsl::*;
diesel::insert_into(devices)
.values(device)
Expand All @@ -210,15 +211,15 @@ impl DatabaseClient {
.first(&mut self.connection)
}

pub fn get_device(&mut self, _device_id: String) -> Result<Device, diesel::result::Error> {
pub fn get_device(&mut self, _device_id: &str) -> Result<Device, diesel::result::Error> {
use crate::schema::devices::dsl::*;
devices
.filter(device_id.eq(_device_id))
.select(Device::as_select())
.first(&mut self.connection)
}

pub fn update_device(&mut self, device: Device) -> Result<usize, diesel::result::Error> {
pub fn update_device(&mut self, device: UpdateDevice) -> Result<usize, diesel::result::Error> {
use crate::schema::devices::dsl::*;
diesel::insert_into(devices)
.values(device)
Expand Down Expand Up @@ -254,6 +255,25 @@ impl DatabaseClient {
.execute(&mut self.connection)
}

pub fn get_subscriptions_by_device_id(&mut self, _device_id: &str) -> Result<Vec<Subscription>, diesel::result::Error> {
use crate::schema::subscriptions::dsl::*;
subscriptions
.inner_join(devices::table)
.filter(devices::device_id.eq(_device_id))
.select(Subscription::as_select())
.load(&mut self.connection)
}

pub fn delete_subscription(&mut self, subscription: Subscription) -> Result<usize, diesel::result::Error> {
use crate::schema::subscriptions::dsl::*;
return diesel::delete(
subscriptions
.filter(device_id.eq(subscription.device_id))
.filter(chain.eq(subscription.chain))
.filter(address.eq(subscription.address))
).execute(&mut self.connection);
}

pub fn get_subscriptions(&mut self, _chain: Chain, addresses: Vec<String>) -> Result<Vec<Subscription>, diesel::result::Error> {
use crate::schema::subscriptions::dsl::*;
subscriptions
Expand All @@ -263,6 +283,14 @@ impl DatabaseClient {
.load(&mut self.connection)
}

pub fn add_subscriptions(&mut self, _subscriptions: Vec<Subscription>) -> Result<usize, diesel::result::Error> {
use crate::schema::subscriptions::dsl::*;
diesel::insert_into(subscriptions)
.values(&_subscriptions)
.on_conflict_do_nothing()
.execute(&mut self.connection)
}

pub fn add_transactions(&mut self, _transactions: Vec<Transaction>) -> Result<usize, diesel::result::Error> {
use crate::schema::transactions::dsl::*;
diesel::insert_into(transactions)
Expand Down
2 changes: 1 addition & 1 deletion storage/src/models/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Asset {
impl Asset {
pub fn as_primitive(&self) -> primitives::asset::Asset {
primitives::asset::Asset{
id: AssetId {chain: Chain::new(&self.chain).unwrap(), token_id: self.token_id.clone() },
id: AssetId {chain: Chain::from_str(&self.chain).unwrap(), token_id: self.token_id.clone() },
name: self.name.clone(),
symbol: self.symbol.clone(),
asset_type: AssetType::from_str(&self.asset_type).unwrap(),
Expand Down
24 changes: 24 additions & 0 deletions storage/src/models/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ use serde::{Deserialize, Serialize};
#[diesel(table_name = crate::schema::devices)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Device {
pub id: i32,
pub device_id: String,
pub platform: String,
pub token: String,
pub locale: String,
pub is_push_enabled: bool,
}

#[derive(Debug, Queryable, Selectable, Serialize, Deserialize, Insertable, AsChangeset, Clone)]
#[diesel(table_name = crate::schema::devices)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct UpdateDevice {
pub device_id: String,
pub platform: String,
pub token: String,
Expand All @@ -23,4 +35,16 @@ impl Device {
is_push_enabled: self.is_push_enabled,
}
}
}

impl UpdateDevice {
pub fn from_primitive(device: primitives::Device) -> Self {
Self {
device_id: device.id,
platform: device.platform.as_str().to_string(),
token: device.token,
locale: device.locale,
is_push_enabled: device.is_push_enabled,
}
}
}
1 change: 1 addition & 0 deletions storage/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use self::asset::Asset;
pub use self::chart::Chart;
pub use self::chart::ChartResult;
pub use self::device::Device;
pub use self::device::UpdateDevice;
pub use self::fiat_asset::FiatAsset;
pub use self::fiat_rate::FiatRate;
pub use self::node::Node;
Expand Down
Loading

0 comments on commit dab1202

Please sign in to comment.