Skip to content

Commit

Permalink
Merge pull request #237 from aeternity/release/0.11.2
Browse files Browse the repository at this point in the history
Release/0.11.2
  • Loading branch information
shekhar-shubhendu committed Nov 21, 2019
2 parents 870bb95 + 186b879 commit a48ab6f
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 56 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -2,3 +2,4 @@
/target
**/*.rs.bk
.env
conf/log4rs.yaml
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "aeternal"
version = "0.11.1"
version = "0.11.2"
authors = ["John Newby <@johnsnewby>", "Shubhendu Shekhar <@shekhar-shubhendu>", "Andrea Giacobino <@noandrea>" ]
description = "æternal: A caching and reporting layer for aeternity blockchain"

Expand Down
4 changes: 2 additions & 2 deletions README.md
@@ -1,4 +1,4 @@
# æternity middleware
# æternal

## Overview

Expand Down Expand Up @@ -44,7 +44,7 @@ The middleware permits you to subscribe to events via a websocket, and receive u

### Use ours!

There is a hosted middleware for the æternity mainnet at http://mdw.aepps.com/, and one for the testnet at https://testnet.mdw.aepps.com.
There is a hosted middleware for the æternity mainnet at https://mainnet.aeternal.io/, and one for the testnet at https://testnet.aeternal.io.

### Install your own

Expand Down
6 changes: 2 additions & 4 deletions docs/middleware-guide.md
Expand Up @@ -2,7 +2,7 @@

author: John Newby

date: 2019-09-27
date: 2019-11-19

corresponds to version: v0.10.0

Expand All @@ -28,11 +28,10 @@ In general when we find a useful query which is not supported by the node, we at

## Installation, or not.

aeternal is written in the Rust language, and is relatively straightforward to install. We, the æternity team, host instances for both mainnet and testnet, which you’re welcome to use. We do understand though that our users may not want to trust us, in which case you’ll be wanting to run your own instance. You can start off by priming with our database, which we publish at https://backups.aepps.com. You may verify this backup using the method described in the ‘verification’ section, below.
aeternal is written in the Rust language, and is relatively straightforward to install. We, the æternal team, host instances for both mainnet and testnet, which you’re welcome to use. We do understand though that our users may not want to trust us, in which case you’ll be wanting to run your own instance. You can start off by priming with our database, which we publish at https://backups.aeternal.io. You may verify this backup using the method described in the ‘verification’ section, below.

The installation instructions are in the README.md file on the Github repository for the project, at [https://github.com/aeternity/aeternal](https://github.com/aeternity/aeternal)


## Interfaces

aeternal has an HTTP API, and a Websocket API. The Websocket API permits subscriptions to events of interest, for instance key block, micro block or transaction generation, or a subscription to any type of object in the block chain. In particular the Websocket API is intended for use cases such as
Expand Down Expand Up @@ -720,4 +719,3 @@ and so on.
## Logging

As of version 0.10.0 aeternal uses the *log4rs* logging package. A sample configuration is given in the file `conf/log4rs.yaml`. We use a custom appender, `log4rs-email` written by one of our team, which emails log lines, in the sample config this is done for the `error` class. We suggest using this sparingly, unless you very much enjoy receiving a lot of emails.

51 changes: 22 additions & 29 deletions src/loader.rs
@@ -1,6 +1,5 @@
use super::schema::key_blocks::dsl::*;
use super::schema::transactions::dsl::*;
use chashmap::*;
use diesel::pg::PgConnection;
use diesel::query_dsl::QueryDsl;
use diesel::Connection;
Expand All @@ -15,10 +14,11 @@ use r2d2::Pool;
use r2d2_diesel::ConnectionManager;
use r2d2_postgres::PostgresConnectionManager;
use serde_json;
use std::collections::HashMap;
use std::env;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
use websocket::Candidate;

Expand Down Expand Up @@ -82,43 +82,36 @@ pub struct BlockLoader {
pub static BACKLOG_CLEARED: i64 = -1;

lazy_static! {
static ref TX_QUEUE: CHashMap<i64, bool> = CHashMap::<i64, bool>::new();
static ref TX_QUEUE: Arc<Mutex<HashMap<i64, bool>>> = Arc::new(Mutex::new(HashMap::new()));
}

fn is_in_queue(_height: i64) -> bool {
match TX_QUEUE.get(&_height) {
None => false,
_ => true,
fn is_in_queue(_height: i64) -> MiddlewareResult<bool> {
match (*TX_QUEUE).lock()?.get(&_height) {
None => Ok(false),
_ => Ok(true),
}
}

fn remove_from_queue(_height: i64) {
info!("TX_QUEUE -> {}", _height);
TX_QUEUE.remove(&_height);
info!("TX_QUEUE len={}", TX_QUEUE.len());
fn remove_from_queue(_height: i64) -> MiddlewareResult<()> {
(*TX_QUEUE).lock()?.remove(&_height)?;
Ok(())
}
fn add_to_queue(_height: i64) {
info!("TX_QUEUE <- {}", _height);
TX_QUEUE.insert(_height, true);
fn add_to_queue(_height: i64) -> MiddlewareResult<()> {
(*TX_QUEUE).lock()?.insert(_height, true);
Ok(())
}

pub fn queue(
_height: i64,
_tx: &std::sync::mpsc::Sender<i64>,
) -> Result<(), std::sync::mpsc::SendError<i64>> {
info!("TX_QUEUE len={}", TX_QUEUE.len());

if is_in_queue(_height) {
info!("TX_QUEUE already has {}", _height);
pub fn queue(_height: i64, _tx: &std::sync::mpsc::Sender<i64>) -> MiddlewareResult<()> {
if is_in_queue(_height)? {
return Ok(());
}
_tx.send(_height)?;
add_to_queue(_height);
add_to_queue(_height)?;
Ok(())
}

pub fn queue_length() -> usize {
TX_QUEUE.len()
pub fn queue_length() -> MiddlewareResult<usize> {
Ok((*TX_QUEUE).lock()?.len())
}

/*
Expand Down Expand Up @@ -244,7 +237,7 @@ impl BlockLoader {
);
}
current_height -= 1;
thread::sleep(std::time::Duration::new(blocks_since_last_fork + 1, 0));
thread::sleep(std::time::Duration::new(blocks_since_last_fork, 0));
}
Ok(fork_was_detected)
}
Expand All @@ -268,7 +261,6 @@ impl BlockLoader {
_tx: &std::sync::mpsc::Sender<i64>,
) -> MiddlewareResult<bool> {
debug!("Invalidating block at height {}", _height);
// diesel::delete(key_blocks.filter(height.eq(&_height))).execute(conn)?;
match queue(_height, _tx) {
Ok(()) => (),
Err(e) => {
Expand Down Expand Up @@ -602,7 +594,7 @@ impl BlockLoader {
* The very simple function which pulls heights from the queue and
* loads them into the DB
*/
pub fn start(&self) {
pub fn start(&self) -> MiddlewareResult<()> {
for b in &self.rx {
debug!("Pulling height {} from queue for storage", b);
if b == BACKLOG_CLEARED {
Expand All @@ -618,12 +610,13 @@ impl BlockLoader {
),
Err(x) => error!("Error loading blocks {}", x),
};
remove_from_queue(b);
remove_from_queue(b)?;
}
}
// if we fall through here something has gone wrong. Let's quit!
error!("Failed to read from the queue, quitting.");
BlockLoader::recover_from_db_error();
Err(MiddlewareError::new("Error in hashmap, quitting"))
}

/*
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Expand Up @@ -289,7 +289,12 @@ fn main() {
Err(x) => error!("fill_missing_heights() returned an error: {}", x),
};
populate_thread = Some(thread::spawn(move || {
loader.start();
loop {
match loader.start() {
Ok(_) => (), // this should never happene
Err(e) => error!("Error in chain population: {:?}", e),
};
}
}));
if websocket {
websocket::start_ws();
Expand Down
3 changes: 3 additions & 0 deletions src/middleware_result.rs
Expand Up @@ -81,5 +81,8 @@ middleware_error_from!(bigdecimal::ParseBigDecimalError);
middleware_error_from!(std::env::VarError);
middleware_error_from!(reqwest::header::InvalidHeaderValue);
middleware_error_from!(base64::DecodeError);
middleware_error_from!(
std::sync::PoisonError<std::sync::MutexGuard<'_, std::collections::HashMap<i64, bool>>>
);

pub type MiddlewareResult<T> = Result<T, MiddlewareError>;
46 changes: 27 additions & 19 deletions src/server.rs
Expand Up @@ -54,9 +54,13 @@ macro_rules! offset_limit_vec {
{$limit:expr, $page:expr, $result:expr} => {
if let Some(_limit) = $limit {
if let Some(_page) = $page {
$result = $result[((_page - 1) * _limit) as usize
..std::cmp::min((_page * _limit) as usize, $result.len() as usize)]
.to_vec();
if $result.len() > (_limit * _page) as usize {
$result = $result[((_page - 1) * _limit) as usize
..std::cmp::min((_page * _limit) as usize, $result.len() as usize)]
.to_vec();
} else {
$result = vec!();
}
}
}
}
Expand Down Expand Up @@ -653,7 +657,7 @@ fn calls_for_contract_address(
let caller_id: String = row.get(2);
let arguments: serde_json::Value = row.get(3);
let callinfo: serde_json::Value = row.get(4);
let result: serde_json::Value = row.get(5);
let result: Option<serde_json::Value> = row.get(5);
calls.push(json!({
"transaction_id": transaction_id,
"contract_id": contract_id,
Expand Down Expand Up @@ -994,7 +998,7 @@ fn active_names(
reverse: Option<String>,
) -> Json<Vec<Name>> {
let connection = PGCONNECTION.get().unwrap();
let top_height = KeyBlock::top_height(&*connection).unwrap();
let top_height = KeyBlock::top_height(&*connection).unwrap();
let sql: String = match owner {
Some(owner) => format!(
"select * from \
Expand All @@ -1003,7 +1007,8 @@ fn active_names(
expires_at >= {} and \
owner = '{}' \
order by expires_at desc",
top_height, top_height,
top_height,
top_height,
sanitize(&owner),
),
_ => format!(
Expand Down Expand Up @@ -1103,7 +1108,7 @@ fn active_name_auctions_internal(
let mut cmp_func: Box<dyn Fn(&NameAuctionEntry, &NameAuctionEntry) -> std::cmp::Ordering> =
match _sort.as_ref() {
"name" => Box::new(|a, b| a.name.cmp(&b.name)),
"max_bid" => Box::new(|a,b| b.winning_bid.cmp(&a.winning_bid)),
"max_bid" => Box::new(|a, b| b.winning_bid.cmp(&a.winning_bid)),
_ => Box::new(|a, b| a.expiration.cmp(&b.expiration)),
};

Expand Down Expand Up @@ -1183,17 +1188,17 @@ struct AuctionInfo {
}

#[get("/names/auctions/<name>/info")]
fn info_for_auction(_state: State<MiddlewareServer>, name: String) ->
Result<Json<AuctionInfo>, Status>
{
fn info_for_auction(
_state: State<MiddlewareServer>,
name: String,
) -> Result<Json<AuctionInfo>, Status> {
let connection = &PGCONNECTION.get().unwrap();
let bids = crate::models::Name::bids_for_name(connection, name.clone()).unwrap();
if let Ok(info) =
crate::models::NameAuctionEntry::load_for_name(connection, name.clone()) {
Ok(Json(AuctionInfo{ bids, info }))
} else {
Err(rocket::http::Status::new(404, "Not found"))
}
if let Ok(info) = crate::models::NameAuctionEntry::load_for_name(connection, name.clone()) {
Ok(Json(AuctionInfo { bids, info }))
} else {
Err(rocket::http::Status::new(404, "Not found"))
}
}

#[get("/names/auctions/bids/<name>?<limit>&<page>")]
Expand Down Expand Up @@ -1236,7 +1241,7 @@ fn status(_state: State<MiddlewareServer>) -> Response {
.unwrap_or("900".into())
.parse::<i64>()
.unwrap();
let queue_length = crate::loader::queue_length();
let queue_length = crate::loader::queue_length().unwrap();
let max_queue_length: i64 = std::env::var("STATUS_MAX_QUEUE_LENGTH")
.unwrap_or("2".into())
.parse::<i64>()
Expand Down Expand Up @@ -1307,7 +1312,10 @@ impl MiddlewareServer {
let allowed_origins = AllowedOrigins::all();
let options = rocket_cors::CorsOptions {
allowed_origins,
allowed_methods: vec!(Method::Get,Method::Post,Method::Options).into_iter().map(From::from).collect(),
allowed_methods: vec![Method::Get, Method::Post, Method::Options]
.into_iter()
.map(From::from)
.collect(),
allowed_headers: AllowedHeaders::All,
allow_credentials: true,
..Default::default()
Expand All @@ -1332,7 +1340,7 @@ impl MiddlewareServer {
.mount("/middleware", routes![get_available_compilers])
.mount("/middleware", routes![generations_by_range])
.mount("/middleware", routes![height_at_epoch])
.mount("/middleware", routes![info_for_auction])
.mount("/middleware", routes![info_for_auction])
.mount("/middleware", routes![name_for_hash])
.mount("/middleware", routes![oracles_all])
.mount("/middleware", routes![oracle_requests_responses])
Expand Down

0 comments on commit a48ab6f

Please sign in to comment.