Skip to content

Commit

Permalink
feat: Add RoCacheEndpoint::redirect_cache. (#1027)
Browse files Browse the repository at this point in the history
* chore: Lift `Arc` around `dyn RoCache` to `RoCacheEndpoint`

In preparation for `ArcSwap` the `CacheReader` in `RoCacheEndpoint`

* feat: Add `RoCacheEndpoint::redirect_cache`.

This is for handling the `AliasCreated` message, which is yet to be
sent from app server.

* chore: clippy fix
  • Loading branch information
chubei committed Feb 25, 2023
1 parent 69d9e97 commit 605aee1
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 126 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ tower = "0.4.13"
hyper = "0.14.24"
actix-http = "3.3.0"
tower-http = {version = "0.3.5", features = ["full"]}
arc-swap = "1.6.0"

6 changes: 6 additions & 0 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub enum ApiError {
ApiAuthError(#[from] AuthError),
#[error("Failed to generate openapi documentation")]
ApiGenerationError(#[source] GenerationError),
#[error("Failed to open cache: {0}")]
OpenCache(#[source] CacheError),
#[error("Failed to open cache: {0}")]
CacheNotFound(String),
#[error("Cannot find schema by name")]
SchemaNotFound(#[source] CacheError),
#[error("Get by primary key is not supported when it is composite: {0:?}")]
Expand Down Expand Up @@ -140,6 +144,8 @@ impl actix_web::error::ResponseError for ApiError {
| ApiError::SchemaNotFound(_)
| ApiError::MultiIndexFetch(_) => StatusCode::UNPROCESSABLE_ENTITY,
ApiError::InternalError(_)
| ApiError::OpenCache(_)
| ApiError::CacheNotFound(_)
| ApiError::QueryFailed(_)
| ApiError::CountFailed(_)
| ApiError::PortAlreadyInUse(_) => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
8 changes: 4 additions & 4 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use dozer_types::{
models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
};
use futures_util::{FutureExt, StreamExt};
use std::{collections::HashMap, path::PathBuf};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tonic::{transport::Server, Streaming};
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
Expand Down Expand Up @@ -49,7 +49,7 @@ impl ApiServer {
}
fn get_dynamic_service(
&self,
endpoints: Vec<RoCacheEndpoint>,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
rx1: Option<broadcast::Receiver<PipelineResponse>>,
) -> Result<
(
Expand Down Expand Up @@ -81,7 +81,7 @@ impl ApiServer {
let typed_service = if self.flags.dynamic {
Some(TypedService::new(
&descriptor_path,
endpoints,
cache_endpoints,
rx1.map(|r| r.resubscribe()),
self.security.clone(),
)?)
Expand Down Expand Up @@ -109,7 +109,7 @@ impl ApiServer {

pub async fn run(
&self,
cache_endpoints: Vec<RoCacheEndpoint>,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
rx1: Option<Receiver<PipelineResponse>>,
) -> Result<(), GRPCError> {
Expand Down
16 changes: 9 additions & 7 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::auth::Access;

Expand All @@ -22,13 +23,13 @@ type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
// #[derive(Clone)]
pub struct CommonService {
/// For look up endpoint from its name. `key == value.endpoint.name`.
pub endpoint_map: HashMap<String, RoCacheEndpoint>,
pub endpoint_map: HashMap<String, Arc<RoCacheEndpoint>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
}

impl CommonService {
pub fn new(
endpoints: Vec<RoCacheEndpoint>,
endpoints: Vec<Arc<RoCacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
) -> Self {
let endpoint_map = endpoints
Expand Down Expand Up @@ -67,7 +68,7 @@ impl CommonGrpcService for CommonService {
let (cache_endpoint, query_request, access) = self.parse_request(request)?;

let count = shared_impl::count(
&cache_endpoint.cache_reader,
&cache_endpoint.cache_reader(),
&cache_endpoint.endpoint.name,
query_request.query.as_deref(),
access,
Expand All @@ -85,8 +86,9 @@ impl CommonGrpcService for CommonService {
) -> Result<Response<QueryResponse>, Status> {
let (cache_endpoint, query_request, access) = self.parse_request(request)?;

let cache_reader = cache_endpoint.cache_reader();
let (schema, records) = shared_impl::query(
&cache_endpoint.cache_reader,
&cache_reader,
&cache_endpoint.endpoint.name,
query_request.query.as_deref(),
access,
Expand All @@ -113,7 +115,7 @@ impl CommonGrpcService for CommonService {
.ok_or_else(|| Status::invalid_argument(endpoint))?;

shared_impl::on_event(
&cache_endpoint.cache_reader,
&cache_endpoint.cache_reader(),
&cache_endpoint.endpoint.name,
query_request.filter.as_deref(),
self.event_notifier.as_ref().map(|r| r.resubscribe()),
Expand Down Expand Up @@ -147,8 +149,8 @@ impl CommonGrpcService for CommonService {
.get(&endpoint)
.map_or(Err(Status::invalid_argument(&endpoint)), Ok)?;

let schema = &cache_endpoint
.cache_reader
let cache_reader = cache_endpoint.cache_reader();
let schema = &cache_reader
.get_schema_and_indexes_by_name(&endpoint)
.map_err(|_| Status::invalid_argument(endpoint))?
.0;
Expand Down
44 changes: 18 additions & 26 deletions dozer-api/src/grpc/typed/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use tonic::{

#[derive(Debug, Clone)]
struct TypedEndpoint {
reader: CacheReader,
endpoint_name: String,
cache_endpoint: Arc<RoCacheEndpoint>,
service_desc: ServiceDesc,
}

Expand Down Expand Up @@ -59,20 +58,19 @@ impl Clone for TypedService {
impl TypedService {
pub fn new(
descriptor_path: &Path,
endpoints: Vec<RoCacheEndpoint>,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
security: Option<ApiSecurity>,
) -> Result<Self, GRPCError> {
let endpoint_map = endpoints
let endpoint_map = cache_endpoints
.into_iter()
.map(|endpoint| {
.map(|cache_endpoint| {
let service_desc =
ProtoGenerator::read_schema(descriptor_path, &endpoint.endpoint.name)?;
ProtoGenerator::read_schema(descriptor_path, &cache_endpoint.endpoint.name)?;
Ok::<_, GenerationError>((
service_desc.service.full_name().to_string(),
TypedEndpoint {
reader: endpoint.cache_reader,
endpoint_name: endpoint.endpoint.name,
cache_endpoint,
service_desc,
},
))
Expand Down Expand Up @@ -113,8 +111,7 @@ impl TypedService {
let method_name = current_path[2];
if method_name == typed_endpoint.service_desc.count.method.name() {
struct CountService {
reader: CacheReader,
endpoint_name: String,
cache_endpoint: Arc<RoCacheEndpoint>,
response_desc: Option<CountResponseDesc>,
}
impl tonic::server::UnaryService<DynamicMessage> for CountService {
Expand All @@ -123,8 +120,8 @@ impl TypedService {
fn call(&mut self, request: Request<DynamicMessage>) -> Self::Future {
let response = count(
request,
&self.reader,
&self.endpoint_name,
&self.cache_endpoint.cache_reader(),
&self.cache_endpoint.endpoint.name,
self.response_desc
.take()
.expect("This future shouldn't be polled twice"),
Expand All @@ -135,8 +132,7 @@ impl TypedService {

let mut grpc = self.create_grpc(typed_endpoint.service_desc.count.method.clone());
let method = CountService {
reader: typed_endpoint.reader.clone(),
endpoint_name: typed_endpoint.endpoint_name.clone(),
cache_endpoint: typed_endpoint.cache_endpoint.clone(),
response_desc: Some(typed_endpoint.service_desc.count.response_desc.clone()),
};
Some(Box::pin(async move {
Expand All @@ -145,8 +141,7 @@ impl TypedService {
}))
} else if method_name == typed_endpoint.service_desc.query.method.name() {
struct QueryService {
reader: CacheReader,
endpoint_name: String,
cache_endpoint: Arc<RoCacheEndpoint>,
response_desc: Option<QueryResponseDesc>,
}
impl tonic::server::UnaryService<DynamicMessage> for QueryService {
Expand All @@ -155,8 +150,8 @@ impl TypedService {
fn call(&mut self, request: Request<DynamicMessage>) -> Self::Future {
let response = query(
request,
&self.reader,
&self.endpoint_name,
&self.cache_endpoint.cache_reader(),
&self.cache_endpoint.endpoint.name,
self.response_desc
.take()
.expect("This future shouldn't be polled twice"),
Expand All @@ -167,8 +162,7 @@ impl TypedService {

let mut grpc = self.create_grpc(typed_endpoint.service_desc.query.method.clone());
let method = QueryService {
reader: typed_endpoint.reader.clone(),
endpoint_name: typed_endpoint.endpoint_name.clone(),
cache_endpoint: typed_endpoint.cache_endpoint.clone(),
response_desc: Some(typed_endpoint.service_desc.query.response_desc.clone()),
};
Some(Box::pin(async move {
Expand All @@ -178,8 +172,7 @@ impl TypedService {
} else if let Some(on_event_method_desc) = &typed_endpoint.service_desc.on_event {
if method_name == on_event_method_desc.method.name() {
struct EventService {
reader: CacheReader,
endpoint_name: String,
cache_endpoint: Arc<RoCacheEndpoint>,
event_desc: Option<EventDesc>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
}
Expand All @@ -193,8 +186,8 @@ impl TypedService {
fn call(&mut self, request: tonic::Request<DynamicMessage>) -> Self::Future {
future::ready(on_event(
request,
&self.reader,
&self.endpoint_name,
&self.cache_endpoint.cache_reader(),
&self.cache_endpoint.endpoint.name,
self.event_desc
.take()
.expect("This future shouldn't be polled twice"),
Expand All @@ -205,8 +198,7 @@ impl TypedService {

let mut grpc = self.create_grpc(on_event_method_desc.method.clone());
let method = EventService {
reader: typed_endpoint.reader.clone(),
endpoint_name: typed_endpoint.endpoint_name.clone(),
cache_endpoint: typed_endpoint.cache_endpoint.clone(),
event_desc: Some(on_event_method_desc.response_desc.clone()),
event_notifier: self.event_notifier.as_ref().map(|r| r.resubscribe()),
};
Expand Down
20 changes: 10 additions & 10 deletions dozer-api/src/grpc/typed/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use crate::{
},
RoCacheEndpoint,
};
use dozer_cache::{
cache::expression::{FilterExpression, QueryExpression},
CacheReader,
};
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
use dozer_types::grpc_types::{
generated::films::FilmEventRequest,
generated::films::{
Expand All @@ -24,7 +21,7 @@ use dozer_types::grpc_types::{
};
use dozer_types::models::{api_config::default_api_config, api_security::ApiSecurity};
use futures_util::FutureExt;
use std::{env, str::FromStr, time::Duration};
use std::{env, str::FromStr, sync::Arc, time::Duration};

use crate::test_utils;
use tokio::{
Expand All @@ -41,12 +38,15 @@ use tonic::{
Code, Request,
};

pub fn setup_pipeline() -> (Vec<RoCacheEndpoint>, Receiver<PipelineResponse>) {
pub fn setup_pipeline() -> (Vec<Arc<RoCacheEndpoint>>, Receiver<PipelineResponse>) {
let endpoint = test_utils::get_endpoint();
let cache_endpoint = RoCacheEndpoint {
cache_reader: CacheReader::new(test_utils::initialize_cache(&endpoint.name, None)),
endpoint,
};
let cache_endpoint = Arc::new(
RoCacheEndpoint::new(
&*test_utils::initialize_cache(&endpoint.name, None),
endpoint,
)
.unwrap(),
);

let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
let default_api_internal = default_api_config().app_grpc.unwrap_or_default();
Expand Down
47 changes: 38 additions & 9 deletions dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,49 @@
use dozer_cache::{cache::RoCache, CacheReader};
use std::{ops::Deref, sync::Arc};

use arc_swap::ArcSwap;
use dozer_cache::{cache::CacheManager, CacheReader};
use dozer_types::models::api_endpoint::ApiEndpoint;
use std::sync::Arc;
mod api_helper;

#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct RoCacheEndpoint {
pub cache_reader: CacheReader,
pub endpoint: ApiEndpoint,
cache_reader: ArcSwap<CacheReader>,
endpoint: ApiEndpoint,
}

impl RoCacheEndpoint {
pub fn new(cache: Arc<dyn RoCache>, endpoint: ApiEndpoint) -> Self {
Self {
cache_reader: CacheReader::new(cache),
pub fn new(cache_manager: &dyn CacheManager, endpoint: ApiEndpoint) -> Result<Self, ApiError> {
let cache_reader = open_cache_reader(cache_manager, &endpoint.name)?;
Ok(Self {
cache_reader: ArcSwap::from_pointee(cache_reader),
endpoint,
}
})
}

pub fn cache_reader(&self) -> impl Deref<Target = Arc<CacheReader>> + '_ {
self.cache_reader.load()
}

pub fn endpoint(&self) -> &ApiEndpoint {
&self.endpoint
}

pub fn redirect_cache(&self, cache_manager: &dyn CacheManager) -> Result<(), ApiError> {
let cache_reader = open_cache_reader(cache_manager, &self.endpoint.name)?;
self.cache_reader.store(Arc::new(cache_reader));
Ok(())
}
}

fn open_cache_reader(
cache_manager: &dyn CacheManager,
name: &str,
) -> Result<CacheReader, ApiError> {
let cache = cache_manager
.open_ro_cache(name)
.map_err(ApiError::OpenCache)?;
let cache = cache.ok_or_else(|| ApiError::CacheNotFound(name.to_string()))?;
Ok(CacheReader::new(cache))
}

// Exports
Expand All @@ -27,6 +55,7 @@ pub mod rest;
// Re-exports
pub use actix_web;
pub use async_trait;
use errors::ApiError;
pub use openapiv3;
pub use tokio;
pub use tonic;
Expand Down
Loading

0 comments on commit 605aee1

Please sign in to comment.