Skip to content

Commit

Permalink
Add perspective crate
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Stein <steinlink@gmail.com>
  • Loading branch information
texodus committed May 20, 2024
1 parent 838a97d commit 4396022
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"rust/bootstrap-runtime",
"rust/perspective-viewer",
"rust/bundle",
"rust/perspective",
"rust/perspective-client",
"rust/perspective-js",
"rust/perspective-python",
Expand Down
35 changes: 35 additions & 0 deletions examples/rust-axum/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
# ┃ This file is part of the Perspective library, distributed under the terms ┃
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

[package]
name = "rust-axum"
version = "0.1.0"
edition = "2021"

[dependencies]
perspective = { version = "2.10.1", path = "../../rust/perspective" }
anyhow = "1.0.66"
axum = { version = "=0.7.4", features = ["ws"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = [
"sink",
"std",
] }
headers = "0.4"
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.21"
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5.0", features = ["fs", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
prost = "0.12"
13 changes: 13 additions & 0 deletions examples/rust-axum/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "rust-axum",
"private": true,
"version": "2.10.1",
"description": "An example of a Rust/Axum virtual Perspective server",
"scripts": {
"start": "cargo run"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {},
"devDependencies": {}
}
100 changes: 100 additions & 0 deletions examples/rust-axum/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use std::fs::{self, File};
use std::io::Read;
use std::net::SocketAddr;

use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::State;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use perspective::client::{TableData, TableInitOptions};
use perspective::server::PerspectiveServer;
use tower_http::trace::TraceLayer;

fn init_tracing() {
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::layer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry;
registry()
.with(layer().pretty().with_filter(LevelFilter::INFO))
.init();
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing();
let server = PerspectiveServer::new();
let client = perspective::create_local_client(&server).await?;

const FILENAME: &str = "../../node_modules/superstore-arrow/superstore.lz4.arrow";
let mut f = File::open(FILENAME).expect("no file found");
let metadata = fs::metadata(FILENAME).expect("unable to read metadata");
let mut feather = Vec::with_capacity(metadata.len() as usize);
f.read_to_end(&mut feather).expect("buffer overflow");

let _table = client
.table(TableData::Arrow(feather), TableInitOptions {
name: Some("superstore".to_owned()),
..TableInitOptions::default()
})
.await?;

let app = Router::new()
.route("/ws", get(websocket_handshake))
.with_state(server)
.layer(TraceLayer::new_for_http());

let service = app.into_make_service_with_connect_info::<SocketAddr>();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
tracing::info!("listening on {}", listener.local_addr()?);
axum::serve(listener, service).await?;
Ok(())
}

async fn websocket_handshake(
ws: WebSocketUpgrade,
State(server): State<PerspectiveServer>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> impl IntoResponse {
tracing::info!("{addr} Connected.");
ws.on_upgrade(move |socket| websocket_session(server, socket, addr))
}

async fn websocket_session(server: PerspectiveServer, mut socket: WebSocket, addr: SocketAddr) {
while let Some(msg) = socket.recv().await {
if let Ok(Message::Binary(x)) = msg {
for (_client_id, resp) in server.handle_message(0, &x) {
if let Err(e) = socket.send(Message::Binary(resp)).await {
tracing::error!("{addr} unexpected error {e:?}");
return;
}
}

for (_client_id, resp) in server.poll() {
if let Err(e) = socket.send(Message::Binary(resp)).await {
tracing::error!("{addr} unexpected error {e:?}");
return;
}
}
} else {
tracing::error!("{addr} Unexpected msg {msg:?}");
tracing::debug!("{addr} Unexpected msg {msg:?}");
}
}

tracing::info!("{addr} disconnected");
}
32 changes: 32 additions & 0 deletions rust/perspective/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
# ┃ This file is part of the Perspective library, distributed under the terms ┃
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

[package]
name = "perspective"
version = "2.10.1"
authors = ["Andrew Stein <steinlink@gmail.com>"]
edition = "2021"
description = "A data visualization and analytics component, especially well-suited for large and/or streaming datasets."
repository = "https://github.com/finos/perspective"
license = "Apache-2.0"
homepage = "https://perspective.finos.org"
keywords = []
include = ["src/**/*", "Cargo.toml"]

[lib]
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"

[dependencies]
perspective-client = { version = "2.10.1", path = "../perspective-client" }
perspective-server = { version = "2.10.1", path = "../perspective-server" }
tracing = { version = ">=0.1.36" }
37 changes: 37 additions & 0 deletions rust/perspective/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use perspective_client::*;
use perspective_server::*;
pub use {perspective_client as client, perspective_server as server};

pub async fn create_local_client(server: &PerspectiveServer) -> ClientResult<Client> {
let server = server.clone();
let client = Client::new_sync({
move |client, req| {
for (_client_id, resp) in server.handle_message(0, req) {
client
.receive(&resp)
.unwrap_or_else(|e| tracing::error!("Unknown server message: {}", e));
}

for (_client_id, resp) in server.poll() {
client
.receive(&resp)
.unwrap_or_else(|e| tracing::error!("Unknown server message: {}", e));
}
}
});

client.init().await?;
Ok(client)
}

0 comments on commit 4396022

Please sign in to comment.