Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error code #188

Merged
merged 3 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
chaostest/__pycache__/

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ docker-rebuild-bin:

# Image for testing undermoon-operator
docker-build-test-image:
rm examples/target_volume/debug/*
rm examples/target_volume/release/*
docker image build -f examples/Dockerfile-builder-test -t undermoon_builder_test .
mkdir -p ./examples/target_volume/debug
docker rm undermoon-builder-container-debug || true
Expand All @@ -68,6 +70,8 @@ docker-build-test-image:
# The release builder will build the binaries and move it out by `docker cp`.
# When the release undermoon image is built, the binaries will be moved into it.
docker-build-release:
rm examples/target_volume/debug/*
rm examples/target_volume/release/*
docker image build -f examples/Dockerfile-builder-release -t undermoon_builder_release .
mkdir -p ./examples/target_volume/release
docker rm undermoon-builder-container || true
Expand Down
3 changes: 2 additions & 1 deletion docs/broker_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ HTTP 409
```

##### (8) PUT /api/v2/clusters/migrations
Try to commit the migration
Try to commit the migration.
The memory broker implementation also cleans up the free nodes after the migration is done.
```
Request:
{
Expand Down
44 changes: 41 additions & 3 deletions docs/memory_broker_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,33 @@ HTTP 200
HTTP 409 { "error": "INVALID_META_VERSION" }
```

#### Get cluster info
`GET` /api/v2/clusters/info/<cluster_name>

##### Success
```
HTTP 200

{
"name": "cluster_name",
"node_number": 8,
"node_number_with_slots": 8,
"is_migrating": false
}
```

##### Error
```
HTTP 404 { "error": "CLUSTER_NOT_FOUND" }
```

#### Create cluster
`POST` /api/v2/clusters/meta/<cluster_name>

##### Request
```json
{
"node_number": 8,
"node_number": 8
}
```
- `cluster_name`
Expand Down Expand Up @@ -167,6 +187,23 @@ HTTP 409 { "error": "FREE_NODE_NOT_FOUND" }
HTTP 409 { "error": "MIGRATION_RUNNING" }
```

#### Add or remove nodes and start migration
`POST` /api/v2/clusters/migrations/auto/<cluster_name>/<node_number>

##### Success
```
HTTP 200
```

##### Error
```
HTTP 400 { "error": "INVALID_CLUSTER_NAME" }
HTTP 404 { "error": "CLUSTER_NOT_FOUND" }
HTTP 409 { "error": "MIGRATION_RUNNING" }
HTTP 400 { "error": "INVALID_NODE_NUMBER" }
HTTP 409 { "error": "NO_AVAILABLE_RESOURCE" }
```

#### Start migration for scaling out
Note that you need to call `Add nodes to cluster` beforehand.

Expand All @@ -181,7 +218,7 @@ HTTP 200
```
HTTP 400 { "error": "INVALID_CLUSTER_NAME" }
HTTP 404 { "error": "CLUSTER_NOT_FOUND" }
HTTP 409 { "error": "FreeNodeFound" }
HTTP 409 { "error": "FREE_NODE_NOT_FOUND" }
HTTP 409 { "error": "MIGRATION_RUNNING" }
```

Expand All @@ -199,8 +236,9 @@ HTTP 200
##### Error
```
HTTP 400 { "error": "INVALID_CLUSTER_NAME" }
HTTP 400 { "error": "INVALID_NODE_NUMBER" }
HTTP 404 { "error": "CLUSTER_NOT_FOUND" }
HTTP 409 { "error": "SLOTS_ALREADY_EVEN" }
HTTP 409 { "error": "FREE_NODE_FOUND" }
HTTP 409 { "error": "MIGRATION_RUNNING" }
```

Expand Down
15 changes: 14 additions & 1 deletion src/broker/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::store::{
ChunkRolePosition, ClusterStore, HostProxy, MetaStore, CHUNK_HALF_NODE_NUM, CHUNK_NODE_NUM,
ChunkRolePosition, ClusterInfo, ClusterStore, HostProxy, MetaStore, CHUNK_HALF_NODE_NUM,
CHUNK_NODE_NUM,
};
use crate::common::cluster::{Cluster, Node, PeerProxy, Proxy, ReplMeta, ReplPeer};
use crate::common::cluster::{ClusterName, Role};
Expand Down Expand Up @@ -142,6 +143,18 @@ impl<'a> MetaStoreQuery<'a> {
Some(Self::cluster_store_to_cluster(&cluster_store))
}

pub fn get_cluster_info_by_name(
&self,
cluster_name: &str,
migration_limit: u64,
) -> Option<ClusterInfo> {
let cluster_name = ClusterName::try_from(cluster_name).ok()?;

let cluster_store =
Self::get_cluster_store(&self.store.clusters, &cluster_name, migration_limit)?;
Some(cluster_store.get_info())
}

fn cluster_store_to_cluster(cluster_store: &ClusterStore) -> Cluster {
let cluster_name = cluster_store.name.clone();

Expand Down
54 changes: 46 additions & 8 deletions src/broker/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::persistence::{MetaStorage, MetaSyncError};
use super::replication::MetaReplicator;
use super::resource::ResourceChecker;
use super::store::{MetaStore, MetaStoreError, CHUNK_HALF_NODE_NUM};
use super::store::{ClusterInfo, MetaStore, MetaStoreError, CHUNK_HALF_NODE_NUM};
use crate::broker::recovery::{fetch_largest_epoch, EpochFetchResult};
use crate::common::cluster::{Cluster, ClusterName, MigrationTaskMeta, Node, Proxy};
use crate::common::version::UNDERMOON_VERSION;
Expand Down Expand Up @@ -82,6 +82,7 @@ pub fn configure_app(cfg: &mut web::ServiceConfig, service: Arc<MemBrokerService
.route("/proxies/failed/addresses", web::get().to(get_failed_proxies))

// Additional api
.route("/clusters/info/{cluster_name}", web::get().to(get_cluster_info_by_name))
.route("/clusters/meta/{cluster_name}", web::post().to(add_cluster))
.route("/clusters/meta/{cluster_name}", web::delete().to(remove_cluster))
.route(
Expand All @@ -98,6 +99,7 @@ pub fn configure_app(cfg: &mut web::ServiceConfig, service: Arc<MemBrokerService
web::post().to(migrate_slots_to_scale_down),
)
.route("/clusters/migrations/expand/{cluster_name}", web::post().to(migrate_slots))
.route("/clusters/migrations/auto/{cluster_name}/{node_number}", web::post().to(auto_scale_node_number))
.route("/clusters/config/{cluster_name}", web::patch().to(change_config))
.route("/clusters/balance/{cluster_name}", web::put().to(balance_masters))

Expand Down Expand Up @@ -244,6 +246,14 @@ impl MemBrokerService {
.get_cluster_by_name(name, migration_limit)
}

pub fn get_cluster_info_by_name(&self, name: &str) -> Option<ClusterInfo> {
let migration_limit = self.config.migration_limit;
self.store
.read()
.expect("MemBrokerService::get_cluster_info_by_name")
.get_cluster_info_by_name(name, migration_limit)
}

pub fn add_proxy(&self, proxy_resource: ProxyResourcePayload) -> Result<(), MetaStoreError> {
let ProxyResourcePayload {
proxy_address,
Expand Down Expand Up @@ -368,6 +378,17 @@ impl MemBrokerService {
.migrate_slots_to_scale_down(cluster_name, new_node_num)
}

pub fn auto_scale_node_number(
&self,
cluster_name: String,
new_node_num: usize,
) -> Result<(), MetaStoreError> {
self.store
.write()
.expect("MemBrokerService::auto_scale_node_number")
.auto_scale_node_number(cluster_name, new_node_num)
}

pub fn get_failures(&self) -> Vec<String> {
let failure_ttl = chrono::Duration::seconds(self.config.failure_ttl as i64);
let failure_quorum = self.config.failure_quorum;
Expand All @@ -388,7 +409,7 @@ impl MemBrokerService {
self.store
.write()
.expect("MemBrokerService::commit_migration")
.commit_migration(task)
.commit_migration(task, true)
}

pub fn replace_failed_proxy(
Expand Down Expand Up @@ -509,6 +530,16 @@ async fn get_cluster_by_name(
web::Json(ClusterPayload { cluster })
}

async fn get_cluster_info_by_name(
(path, state): (web::Path<(String,)>, ServiceState),
) -> Result<web::Json<ClusterInfo>, MetaStoreError> {
let name = path.into_inner().0;
match state.get_cluster_info_by_name(&name) {
Some(cluster_info) => Ok(web::Json(cluster_info)),
None => Err(MetaStoreError::ClusterNotFound),
}
}

async fn get_failures(state: ServiceState) -> impl Responder {
let addresses = state.get_failures();
web::Json(FailuresPayload { addresses })
Expand Down Expand Up @@ -680,20 +711,27 @@ async fn migrate_slots(
(path, state): (web::Path<(String,)>, ServiceState),
) -> Result<&'static str, MetaStoreError> {
let (cluster_name,) = path.into_inner();
let res = state.migrate_slots(cluster_name).map(|()| "")?;
state.migrate_slots(cluster_name)?;
state.trigger_update().await?;
Ok(res)
Ok("")
}

async fn migrate_slots_to_scale_down(
(path, state): (web::Path<(String, usize)>, ServiceState),
) -> Result<&'static str, MetaStoreError> {
let (cluster_name, new_node_num) = path.into_inner();
let res = state
.migrate_slots_to_scale_down(cluster_name, new_node_num)
.map(|()| "")?;
state.migrate_slots_to_scale_down(cluster_name, new_node_num)?;
state.trigger_update().await?;
Ok(res)
Ok("")
}

async fn auto_scale_node_number(
(path, state): (web::Path<(String, usize)>, ServiceState),
) -> Result<&'static str, MetaStoreError> {
let (cluster, new_node_num) = path.into_inner();
state.auto_scale_node_number(cluster, new_node_num)?;
state.trigger_update().await?;
Ok("")
}

async fn add_failure(
Expand Down