Skip to content
Permalink
Browse files
GERONIMO-6583 - Adding bulkhead support.
With this change all duration checking is done in nanos (this is only checked in the bulkhead part of the TCK for some reason)
  • Loading branch information
johnament committed Oct 28, 2017
1 parent 75d1acd commit 964551d17f1ff3b1743c41d7cc629b8397f56557
Showing 23 changed files with 625 additions and 33 deletions.
@@ -19,6 +19,7 @@

package org.apache.safeguard.api;

import org.apache.safeguard.api.bulkhead.BulkheadManager;
import org.apache.safeguard.api.circuitbreaker.CircuitBreakerManager;
import org.apache.safeguard.api.retry.RetryManager;

@@ -30,4 +31,6 @@ public interface ExecutionManager {
CircuitBreakerManager getCircuitBreakerManager();

RetryManager getRetryManager();

BulkheadManager getBulkheadManager();
}
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.api.bulkhead;

import java.util.concurrent.Callable;

public interface Bulkhead {
BulkheadDefinition getBulkheadDefinition();

int getCurrentQueueDepth();

int getCurrentExecutions();

<T> T execute(Callable<T> callable);
}
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.api.bulkhead;

public interface BulkheadBuilder {
BulkheadBuilder withMaxConcurrency(int maxConcurrency);

BulkheadBuilder withMaxWaiting(int overflowCapacity);

BulkheadBuilder asynchronous();

BulkheadDefinition build();
}
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.api.bulkhead;

public interface BulkheadDefinition {
int getMaxConcurrentExecutions();

int getMaxWaitingExecutions();

boolean isAsynchronous();
}
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.api.bulkhead;

public interface BulkheadManager {
BulkheadBuilder newBulkheadBuilder(String name);
Bulkhead getBulkhead(String name);
}
@@ -19,7 +19,7 @@

package org.apache.safeguard.exception;

public class AsyncException extends RuntimeException {
public class AsyncException extends SafeguardException {
public AsyncException(Exception e) {
super(e);
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.exception;

import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceException;

public class SafeguardException extends FaultToleranceException{
public SafeguardException() {
}

public SafeguardException(String message) {
super(message);
}

public SafeguardException(String message, Throwable cause) {
super(message, cause);
}

public SafeguardException(Throwable cause) {
super(cause);
}

}
@@ -20,6 +20,8 @@
package org.apache.safeguard.impl;

import org.apache.safeguard.api.ExecutionManager;
import org.apache.safeguard.api.bulkhead.BulkheadManager;
import org.apache.safeguard.impl.bulkhead.BulkheadManagerImpl;
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreakerManager;
import org.apache.safeguard.impl.executionPlans.ExecutionPlanFactory;
import org.apache.safeguard.impl.retry.FailsafeRetryManager;
@@ -32,14 +34,17 @@

@Vetoed
public class FailsafeExecutionManager implements ExecutionManager {
private BulkheadManagerImpl bulkheadManager;
private FailsafeCircuitBreakerManager circuitBreakerManager;
private FailsafeRetryManager retryManager;
private ExecutionPlanFactory executionPlanFactory;

public FailsafeExecutionManager() {
this.circuitBreakerManager = new FailsafeCircuitBreakerManager();
this.retryManager = new FailsafeRetryManager();
this.executionPlanFactory = new ExecutionPlanFactory(this.circuitBreakerManager, this.retryManager);
this.bulkheadManager = new BulkheadManagerImpl();
this.executionPlanFactory = new ExecutionPlanFactory(this.circuitBreakerManager, this.retryManager,
this.bulkheadManager);
}

public Object execute(InvocationContext invocationContext) {
@@ -74,4 +79,9 @@ public FailsafeRetryManager getRetryManager() {
return retryManager;
}

@Override
public BulkheadManager getBulkheadManager() {
return bulkheadManager;
}

}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.impl.bulkhead;

import org.apache.safeguard.api.bulkhead.BulkheadBuilder;
import org.apache.safeguard.api.bulkhead.BulkheadDefinition;

public class BulkheadBuilderImpl implements BulkheadBuilder{
private final String name;
private final BulkheadManagerImpl bulkheadManager;
private int maxWaitingExecutions;
private int maxConcurrentExecutions;
private boolean asynchronous = false;

BulkheadBuilderImpl(String name, BulkheadManagerImpl bulkheadManager) {
this.name = name;
this.bulkheadManager = bulkheadManager;
}

@Override
public BulkheadBuilder withMaxConcurrency(int maxConcurrency) {
this.maxConcurrentExecutions = maxConcurrency;
return this;
}

@Override
public BulkheadBuilder withMaxWaiting(int overflowCapacity) {
this.maxWaitingExecutions = overflowCapacity;
return this;
}

@Override
public BulkheadBuilder asynchronous() {
this.asynchronous = true;
return this;
}

@Override
public BulkheadDefinition build() {
BulkheadDefinitionImpl definition = new BulkheadDefinitionImpl(maxConcurrentExecutions, maxWaitingExecutions, asynchronous);
bulkheadManager.register(name, definition);
return definition;

}
}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.impl.bulkhead;

import org.apache.safeguard.api.bulkhead.BulkheadDefinition;

public class BulkheadDefinitionImpl implements BulkheadDefinition{
private final int maxConcurrentExecutions;
private final int maxWaitingExecutions;
private final boolean asynchronous;

public BulkheadDefinitionImpl(int maxConcurrentExecutions, int maxWaitingExecutions, boolean asynchronous) {
this.maxConcurrentExecutions = maxConcurrentExecutions;
this.maxWaitingExecutions = maxWaitingExecutions;
this.asynchronous = asynchronous;
}

@Override
public int getMaxConcurrentExecutions() {
return maxConcurrentExecutions;
}

@Override
public int getMaxWaitingExecutions() {
return maxWaitingExecutions;
}

@Override
public boolean isAsynchronous() {
return asynchronous;
}
}
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.impl.bulkhead;

import org.apache.safeguard.api.bulkhead.Bulkhead;
import org.apache.safeguard.api.bulkhead.BulkheadBuilder;
import org.apache.safeguard.api.bulkhead.BulkheadManager;

import java.util.HashMap;
import java.util.Map;

public class BulkheadManagerImpl implements BulkheadManager{
private Map<String, Bulkhead> bulkheads = new HashMap<>();;
@Override
public BulkheadBuilder newBulkheadBuilder(String name) {
return new BulkheadBuilderImpl(name, this);
}

@Override
public Bulkhead getBulkhead(String name) {
return bulkheads.get(name);
}

void register(String name, BulkheadDefinitionImpl bulkheadDefinition) {
Bulkhead bulkhead;
if (bulkheadDefinition.isAsynchronous()) {
bulkhead = new ThreadPoolBulkhead(bulkheadDefinition);
}
else {
bulkhead = new SemaphoreBulkhead(bulkheadDefinition);
}
bulkheads.put(name, bulkhead);
}
}

0 comments on commit 964551d

Please sign in to comment.