Skip to content

Commit

Permalink
Improve parser and complete pusher first iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
gemcoder21 committed Sep 5, 2023
1 parent 5d6f1fc commit 0ab8c86
Show file tree
Hide file tree
Showing 51 changed files with 872 additions and 235 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,11 @@ path = "/metrics"
url = "https://raw.githubusercontent.com/gemwalletcom/assets/master"

[chains.binance]
url = "https://dex.binance.org"
url = "https://dex.binance.org"
api = "https://api.binance.org"

[pusher]
url = ""

[pusher.ios]
topic = ""
2 changes: 2 additions & 0 deletions api/src/device/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl DevicesClient {
id: device.device_id,
platform: Platform::from_str(device.platform.as_str()).unwrap(),
token: device.token,
locale: device.locale,
is_push_enabled: device.is_push_enabled,
}
)
Expand All @@ -48,6 +49,7 @@ impl DevicesClient {
device_id: device.id,
platform: device.platform.as_str().to_string(),
token: device.token,
locale: device.locale,
is_push_enabled: device.is_push_enabled,
};
}
Expand Down
7 changes: 1 addition & 6 deletions api/src/prices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ pub async fn get_assets_prices(request: Json<AssetPricesRequest>, price_client:
fn price_response(prices: Vec<Price>) -> Vec<AssetPrice> {
let mut response = Vec::new();
for asset_price in prices {
let price_response = AssetPrice{
asset_id: asset_price.asset_id,
price: asset_price.price,
price_change_percentage_24h: asset_price.price_change_percentage_24h,
};
response.push(price_response);
response.push(asset_price.as_primitive());
}
response
}
2 changes: 1 addition & 1 deletion bin/generate/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
let platform = std::env::args().nth(1).expect("no platform specified");
let platform_directory_path = std::env::args().nth(2).expect("no path specified");

let ignored_files = vec!["lib.rs", "mod.rs"];
let ignored_files = vec!["lib.rs", "mod.rs", "client.rs"];

for folder in folders {
let paths = get_paths(folder, format!("{}/src", folder));
Expand Down
4 changes: 3 additions & 1 deletion blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ typeshare = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
async-trait = { workspace = true }
async-trait = { workspace = true }
primitives = { path = "../primitives" }
chrono = { workspace = true }
54 changes: 45 additions & 9 deletions blockchain/src/bnbchain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::error::Error;

use crate::ChainProvider;
use async_trait::async_trait;
use primitives::{chain::Chain, Transaction, TransactionType, TransactionState, TransactionDirection, asset_id::AssetId};
use chrono::Utc;

use super::model::{Block, NodeInfo};

Expand All @@ -21,11 +23,45 @@ impl BNBChainClient {
client,
}
}

pub fn map_transaction(&self, transaction: super::model::Transaction) -> Option<primitives::Transaction> {
if transaction.r#type != "TRANSFER" {
return None;
}
let token_id = if transaction.asset == Some("BNB".to_string()) { None } else { transaction.asset };
let asset_id = AssetId{chain: self.get_chain(), token_id};

let transaction = primitives::Transaction{
id: "".to_string(), //transaction.id,
hash: transaction.hash,
asset_id,
from: transaction.from_addr,
to: transaction.to_addr.unwrap_or_default(),
contract: None,
transaction_type: TransactionType::Transfer,
state: TransactionState::Confirmed,
block_number: transaction.block_height.into(),
sequence: transaction.sequence,
fee: transaction.fee.to_string(),
fee_asset_id: AssetId::from_chain(self.get_chain()),
value: format!("{:?}", transaction.amount.unwrap_or_default()),
memo: transaction.memo.into(),
direction: TransactionDirection::SelfTransfer,
created_at: Utc::now().naive_utc(),
updated_at: Utc::now().naive_utc(),
};
return Some(transaction)
}
}

#[async_trait]
impl ChainProvider for BNBChainClient {
async fn get_latest_block(&self) -> Result<i32, Box<dyn Error>> {

fn get_chain(&self) -> Chain {
Chain::Binance
}

async fn get_latest_block(&self) -> Result<i32, Box<dyn Error + Send + Sync>> {
let url = format!("{}/api/v1/node-info", self.url);
let response = self.client
.get(url)
Expand All @@ -34,21 +70,21 @@ impl ChainProvider for BNBChainClient {
.json::<NodeInfo>()
.await?;

return Ok(response.sync_info.latest_block_height);
return Ok(response.sync_info.latest_block_height.into());
}

async fn get_transactions(&self, block: i32) -> Result<Vec<i32>, Box<dyn Error>> {
async fn get_transactions(&self, block: i32) -> Result<Vec<Transaction>, Box<dyn Error + Send + Sync>> {
let url = format!("{}/bc/api/v1/blocks/{}/txs", self.api_url, block);

let response = self.client
let transactions = self.client
.get(url)
.send()
.await?
.json::<Block>()
.await?;

let _transactions = response.txs;
.await?
.txs.into_iter()
.flat_map(|x| self.map_transaction(x))
.collect::<Vec<primitives::Transaction>>();

return Ok(vec![]);
return Ok(transactions);
}
}
6 changes: 3 additions & 3 deletions blockchain/src/bnbchain/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ pub struct Block {
pub struct Transaction {
pub hash: String,
pub r#type: String,
pub fee: i64,
pub fee: i32,
pub memo: String,
pub asset: Option<String>,
pub amount: Option<i64>,
pub from_addr: String,
pub to_addr: Option<String>,
pub block_height: i64,
pub sequence: i64,
pub block_height: i32,
pub sequence: i32,
}

#[typeshare]
Expand Down
29 changes: 25 additions & 4 deletions blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,34 @@

mod aptos;
mod bnbchain;
use std::error::Error;

pub use self::bnbchain::client::BNBChainClient;
use async_trait::async_trait;
use primitives::{chain::Chain, Transaction};

use std::sync::Arc;

#[async_trait]
pub trait ChainProvider: Send + Sync {
fn get_chain(&self) -> Chain;
async fn get_latest_block(&self) -> Result<i32, Box<dyn std::error::Error + Send + Sync>>;
async fn get_transactions(&self, block_number: i32) -> Result<Vec<Transaction>, Box<dyn std::error::Error + Send + Sync>>;
}

#[async_trait]
pub trait ChainProvider {
async fn get_latest_block(&self) -> Result<i32, Box<dyn Error>>;
async fn get_transactions(&self, block: i32) -> Result<Vec<i32>, Box<dyn Error>>;
impl<T: Send + Sync> ChainProvider for Arc<T>
where
T: ChainProvider + ?Sized,
{
fn get_chain(&self) -> Chain {
(**self).get_chain()
}

async fn get_latest_block(&self) -> Result<i32, Box<dyn std::error::Error + Send + Sync>> {
(**self).get_latest_block().await
}

async fn get_transactions(&self, block_number: i32) -> Result<Vec<Transaction>, Box<dyn std::error::Error + Send + Sync>> {
(**self).get_transactions(block_number).await
}
}
4 changes: 3 additions & 1 deletion parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ primitives = { path = "../primitives" }
storage = { path = "../storage" }
settings = { path = "../settings" }
blockchain = { path = "../blockchain" }
tokio = { workspace = true }
tokio = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
71 changes: 65 additions & 6 deletions parser/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{thread::{sleep, self}, time::Duration};

pub mod pusher;

use blockchain::ChainProvider;
use primitives::chain::Chain;
use primitives::{chain::Chain, Transaction};
use settings::Settings;
use storage::database::DatabaseClient;
use storage::DatabaseClient;

use crate::pusher::Pusher;

#[tokio::main]
pub async fn main() {
Expand All @@ -12,9 +16,39 @@ pub async fn main() {
let settings: Settings = Settings::new().unwrap();
let mut database_client: DatabaseClient = DatabaseClient::new(&settings.postgres.url);
let bnbchain_client = blockchain::BNBChainClient::new(
settings.chains.binance.url.to_string(),
"https://api.binance.org".to_string()
settings.chains.binance.url,
settings.chains.binance.api
);
let mut pusher = Pusher::new(
settings.pusher.url,
settings.postgres.url,
settings.pusher.ios.topic,
);

// let providers: Vec<Box<dyn ChainProvider>> = vec![
// Box::new(blockchain::BNBChainClient::new(
// settings.chains.binance.url,
// settings.chains.binance.api
// )),
// Box::new(blockchain::BNBChainClient::new(
// settings.chains.binance.url,
// settings.chains.binance.api
// )),
// ];

// for provider in providers {
// tokio::spawn(async move {

// println!("launch provider: {:?}", provider.get_chain());

// loop {
// let latest_block: i32 = provider.get_latest_block().await.unwrap();
// println!("latest_block: {:?}", latest_block);

// //thread::sleep(Duration::from_secs(2))
// }
// });
// }

loop {
let state = database_client.get_parser_state(Chain::Binance).unwrap();
Expand Down Expand Up @@ -45,10 +79,35 @@ pub async fn main() {

let transactions = bnbchain_client.get_transactions(next_block).await;
match transactions {
Ok(_) => {
Ok(transactions) => {
let _ = database_client.set_parser_state_current_block(Chain::Binance, next_block);

//println!("transactions: {:?}", transactions);
let addresses = transactions.clone().into_iter().map(|x| x.addresses() ).flatten().collect();
let subscriptions = database_client.get_subscriptions(Chain::Binance, addresses).unwrap();
let mut store_transactions: Vec<Transaction> = vec![];

for subscription in subscriptions {
for transaction in transactions.clone() {
if transaction.addresses().contains(&subscription.address) {
let device = database_client.get_device_by_id(subscription.device_id).unwrap();
println!("Push: device: {}, transaction: {:?}", subscription.device_id, transaction.hash);

store_transactions.push(transaction.clone());

let result = pusher.push(device.as_primitive(), transaction.clone()).await;
match result {
Ok(result) => { println!("Push: result: {:?}", result); },
Err(err) => { println!("Push: error: {:?}", err); }
}
}
}
}

let db_transactions = store_transactions.into_iter().map(|transaction| {
storage::models::Transaction::from_primitive(transaction)
}).collect();

database_client.add_transactions(db_transactions).unwrap();
},
Err(err) => {
println!("get transactions error: {:?}", err);
Expand Down
Empty file added parser/src/parser.rs
Empty file.
31 changes: 31 additions & 0 deletions parser/src/pusher/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use super::model::{Notifications, Response};
pub struct PusherClient {
url: String,
client: reqwest::Client,
}

impl PusherClient {
pub fn new(
url: String,
) -> Self {
let client = reqwest::Client::new();
Self {
url,
client,
}
}

pub async fn push(&self, notifications: Notifications) -> Result<usize, reqwest::Error> {
let url = format!("{}/api/push", self.url);

let _ = self.client
.post(&url)
.json(&notifications)
.send()
.await?
.json::<Response>()
.await?;

Ok(notifications.notifications.len())
}
}
Loading

0 comments on commit 0ab8c86

Please sign in to comment.