Skip to content

Commit

Permalink
Merge branch 'main' into protoc-optional
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Apr 27, 2024
2 parents 0ce5ac8 + ec1132f commit c6f0ce3
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 136 deletions.
214 changes: 138 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ constant_time_eq = "0.3"
env_logger = "0.10.0"
futures = "0.3.28"
hex = "0.4"
http = "1"
hyper = { version = "0.14", features = ["client"] }
hyper-proxy = { version = "0.9.1", default-features = false }
log = "0.4.20"
num-bigint = "0.4"
prost = "0.11"
prost-build = "0.11"
rand = "0.8.5"
reqwest = { version = "0.11", default-features = false, features = ["json", "stream"] }
reqwest = { version = "0.12.4", default-features = false, features = ["json", "stream"] }
rusqlite = "0.29.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.107"
Expand Down
5 changes: 4 additions & 1 deletion denokv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ tokio.workspace = true
uuid.workspace = true

[dev-dependencies]
bytes.workspace = true
denokv_remote.workspace = true
http.workspace = true
num-bigint.workspace = true
tempfile.workspace = true
reqwest.workspace = true
v8_valueserializer.workspace = true
url.workspace = true
v8_valueserializer.workspace = true
114 changes: 108 additions & 6 deletions denokv/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;

use bytes::Bytes;
use denokv_proto::AtomicWrite;
use denokv_proto::Database;
use denokv_proto::KvEntry;
use denokv_proto::KvValue;
use denokv_proto::ReadRange;
use denokv_proto::WatchKeyOutput;
use denokv_remote::RemotePermissions;
use denokv_remote::RemoteResponse;
use denokv_remote::RemoteTransport;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::task::LocalSet;
use url::Url;
use v8_valueserializer::value_eq;
use v8_valueserializer::Heap;
use v8_valueserializer::Value;
Expand All @@ -18,6 +29,39 @@ use v8_valueserializer::ValueSerializer;

const ACCESS_TOKEN: &str = "1234abcd5678efgh";

#[derive(Clone)]
struct ReqwestClient(reqwest::Client);
struct ReqwestResponse(reqwest::Response);

impl RemoteTransport for ReqwestClient {
type Response = ReqwestResponse;
async fn post(
&self,
url: Url,
headers: http::HeaderMap,
body: Bytes,
) -> Result<(Url, http::StatusCode, Self::Response), anyhow::Error> {
let res = self.0.post(url).headers(headers).body(body).send().await?;
let url = res.url().clone();
let status = res.status();
Ok((url, status, ReqwestResponse(res)))
}
}

impl RemoteResponse for ReqwestResponse {
async fn bytes(self) -> Result<Bytes, anyhow::Error> {
Ok(self.0.bytes().await?)
}
fn stream(
self,
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync {
self.0.bytes_stream().map_err(|e| e.into())
}
async fn text(self) -> Result<String, anyhow::Error> {
Ok(self.0.text().await?)
}
}

fn denokv_exe() -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("../target");
Expand Down Expand Up @@ -100,7 +144,7 @@ impl denokv_remote::RemotePermissions for DummyPermissions {
#[tokio::test]
async fn basics() {
let (_child, addr) = start_server().await;
let client = reqwest::Client::new();
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
Expand Down Expand Up @@ -171,10 +215,68 @@ async fn basics() {
println!("remote");
}

#[tokio::test]
async fn watch() {
let (_child, addr) = start_server().await;
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
url,
access_token: ACCESS_TOKEN.to_string(),
};

let remote =
denokv_remote::Remote::new(client, DummyPermissions, metadata_endpoint);
let remote2 = remote.clone();
let local = LocalSet::new();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
local.spawn_local(async move {
let mut s = remote2.watch(vec![vec![1]]);
while let Some(w) = s.next().await {
let w = w.expect("watch success");
eprintln!("Watch output: {w:?}");
for w in w {
if let WatchKeyOutput::Changed { entry: Some(_) } = w {
tx.send(w).expect("send success");
return;
}
}
}
});
local.spawn_local(async move {
for i in 0..10 {
_ = remote
.atomic_write(AtomicWrite {
checks: vec![],
mutations: vec![denokv_proto::Mutation {
key: vec![1],
kind: denokv_proto::MutationKind::Set(denokv_proto::KvValue::U64(
i,
)),
expire_at: None,
}],
enqueues: vec![],
})
.await
.unwrap()
.expect("commit success");
}
});
tokio::time::timeout(Duration::from_secs(60), local)
.await
.expect("no timeout");
let w = rx.try_recv().expect("recv success");
let WatchKeyOutput::Changed { entry: Some(entry) } = w else {
panic!("Unexpected watch result");
};
assert_eq!(entry.key, vec![1]);
}

#[tokio::test]
async fn no_auth() {
let (_child, addr) = start_server().await;
let client = reqwest::Client::new();
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
Expand Down Expand Up @@ -204,7 +306,7 @@ async fn no_auth() {
#[tokio::test]
async fn sum_type_mismatch() {
let (_child, addr) = start_server().await;
let client = reqwest::Client::new();
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
Expand Down Expand Up @@ -281,7 +383,7 @@ async fn sum_type_mismatch() {
#[tokio::test]
async fn sum_values() {
let (_child, addr) = start_server().await;
let client = reqwest::Client::new();
let client = ReqwestClient(reqwest::Client::new());
let url = format!("http://localhost:{}", addr.port()).parse().unwrap();

let metadata_endpoint = denokv_remote::MetadataEndpoint {
Expand Down Expand Up @@ -605,8 +707,8 @@ async fn sum_values() {
));
}

async fn read_key_1<P: RemotePermissions>(
remote: &denokv_remote::Remote<P>,
async fn read_key_1<P: RemotePermissions, T: RemoteTransport>(
remote: &denokv_remote::Remote<P, T>,
) -> denokv_proto::KvEntry {
let ranges = remote
.snapshot_read(
Expand Down
1 change: 1 addition & 0 deletions proto/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ pub struct CommitResult {
pub versionstamp: Versionstamp,
}

#[derive(Debug)]
/// The message notifying about the status of a single key in a watch request.
pub enum WatchKeyOutput {
/// The key has not changed since the last delivery. Deliver the entry.
Expand Down
5 changes: 4 additions & 1 deletion remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ bytes.workspace = true
chrono.workspace = true
denokv_proto.workspace = true
futures.workspace = true
http.workspace = true
log.workspace = true
prost.workspace = true
rand.workspace = true
reqwest.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
tokio-util.workspace = true
url.workspace = true
uuid.workspace = true

[dev-dependencies]
reqwest.workspace = true
Loading

0 comments on commit c6f0ce3

Please sign in to comment.