Skip to content

Commit

Permalink
Refactor REST API to reference primitive types in URIs. (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo authored and johnou committed Aug 3, 2018
1 parent d2f141d commit 0560bfa
Show file tree
Hide file tree
Showing 41 changed files with 882 additions and 602 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -664,9 +664,9 @@ public <E> WorkQueue<E> getWorkQueue(String name) {
} }


@Override @Override
public <P extends SyncPrimitive> CompletableFuture<P> getPrimitiveAsync(String name) { public PrimitiveType getPrimitiveType(String typeName) {
checkRunning(); checkRunning();
return primitives.getPrimitiveAsync(name); return primitives.getPrimitiveType(typeName);
} }


@Override @Override
Expand Down
202 changes: 2 additions & 200 deletions core/src/main/java/io/atomix/core/PrimitivesService.java
Expand Up @@ -103,18 +103,9 @@
import io.atomix.core.workqueue.WorkQueue; import io.atomix.core.workqueue.WorkQueue;
import io.atomix.core.workqueue.WorkQueueBuilder; import io.atomix.core.workqueue.WorkQueueBuilder;
import io.atomix.core.workqueue.WorkQueueType; import io.atomix.core.workqueue.WorkQueueType;
import io.atomix.primitive.PrimitiveBuilder; import io.atomix.primitive.PrimitiveFactory;
import io.atomix.primitive.PrimitiveInfo;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.SyncPrimitive; import io.atomix.primitive.SyncPrimitive;
import io.atomix.primitive.config.PrimitiveConfig;
import io.atomix.utils.AtomixRuntimeException;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


/** /**
* Manages the creation of distributed primitive instances. * Manages the creation of distributed primitive instances.
Expand Down Expand Up @@ -145,7 +136,7 @@
* } * }
* </pre> * </pre>
*/ */
public interface PrimitivesService { public interface PrimitivesService extends PrimitiveFactory {


/** /**
* Creates a new named {@link DistributedMap} builder. * Creates a new named {@link DistributedMap} builder.
Expand Down Expand Up @@ -1508,193 +1499,4 @@ default TransactionBuilder transactionBuilder() {
*/ */
<E> WorkQueue<E> getWorkQueue(String name); <E> WorkQueue<E> getWorkQueue(String name);


/**
* Gets or creates a primitive.
* <p>
* A new primitive will be created if no primitive instance with the given {@code name} exists on this node, otherwise
* the existing instance will be returned. The name is used to reference a distinct instance of the primitive within
* the cluster. The returned primitive will share the same state with primitives of the same name on other nodes.
* <p>
* The constructed primitive will be based on any pre-existing primitive configuration for the given {@code name}.
* <p>
* To get an asynchronous instance of the primitive, use the {@link SyncPrimitive#async()} method:
* <pre>
* {@code
* AsyncPrimitive async = atomix.getPrimitive("my-primitive").async();
* }
* </pre>
*
* @param name the primitive name
* @param <P> the primitive type
* @return the primitive instance
*/
default <P extends SyncPrimitive> P getPrimitive(String name) {
try {
return this.<P>getPrimitiveAsync(name).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AtomixRuntimeException(e);
}
}

/**
* Gets or creates a distributed primitive.
* <p>
* A new primitive of the given {@code primitiveType} will be created if no primitive instance with the given
* {@code name} exists on this node, otherwise the existing instance will be returned. The name is used to reference
* a distinct instance of the primitive within the cluster. The returned primitive will share the same state with
* primitives of the same name on other nodes.
* <p>
* When the instance is initially constructed, it will be configured with any pre-existing primitive configuration
* defined in {@code atomix.conf}.
* <p>
* To get an asynchronous instance of the primitive, use the {@link SyncPrimitive#async()} method:
* <pre>
* {@code
* AsyncPrimitive async = atomix.getPrimitive("my-primitive").async();
* }
* </pre>
*
* @param name the primitive name
* @param primitiveType the primitive type
* @param <P> the primitive type
* @return the primitive instance
*/
default <P extends SyncPrimitive> P getPrimitive(String name, PrimitiveType<?, ?, P> primitiveType) {
try {
return getPrimitiveAsync(name, primitiveType).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AtomixRuntimeException(e);
}
}

/**
* Gets or creates a distributed primitive.
* <p>
* A new primitive of the given {@code primitiveType} will be created if no primitive instance with the given
* {@code name} exists on this node, otherwise the existing instance will be returned. The name is used to reference
* a distinct instance of the primitive within the cluster. The returned primitive will share the same state with
* primitives of the same name on other nodes.
* <p>
* When the instance is initially constructed, it will be configured with any pre-existing primitive configuration
* defined in {@code atomix.conf}.
* <p>
* To get an asynchronous instance of the primitive, use the {@link SyncPrimitive#async()} method:
* <pre>
* {@code
* AsyncPrimitive async = atomix.getPrimitive("my-primitive").async();
* }
* </pre>
*
* @param name the primitive name
* @param primitiveType the primitive type
* @param primitiveConfig the primitive configuration
* @param <C> the primitive configuration type
* @param <P> the primitive type
* @return the primitive instance
*/
default <C extends PrimitiveConfig<C>, P extends SyncPrimitive> P getPrimitive(
String name,
PrimitiveType<?, C, P> primitiveType,
C primitiveConfig) {
try {
return getPrimitiveAsync(name, primitiveType, primitiveConfig).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AtomixRuntimeException(e);
}
}

/**
* Gets or creates a primitive asynchronously.
* <p>
* A new primitive will be created if no primitive instance with the given {@code name} exists on this node, otherwise
* the existing instance will be returned. The name is used to reference a distinct instance of the primitive within
* the cluster. The returned primitive will share the same state with primitives of the same name on other nodes.
* <p>
* The constructed primitive will be based on any pre-existing primitive configuration for the given {@code name}.
*
* @param name the primitive name
* @param <P> the primitive type
* @return the primitive instance
*/
<P extends SyncPrimitive> CompletableFuture<P> getPrimitiveAsync(String name);

/**
* Gets or creates a distributed primitive asynchronously.
* <p>
* A new primitive of the given {@code primitiveType} will be created if no primitive instance with the given
* {@code name} exists on this node, otherwise the existing instance will be returned. The name is used to reference
* a distinct instance of the primitive within the cluster. The returned primitive will share the same state with
* primitives of the same name on other nodes.
* <p>
* When the instance is initially constructed, it will be configured with any pre-existing primitive configuration
* defined in {@code atomix.conf}.
*
* @param name the primitive name
* @param primitiveType the primitive type
* @param <P> the primitive type
* @return the primitive instance
*/
<P extends SyncPrimitive> CompletableFuture<P> getPrimitiveAsync(String name, PrimitiveType<?, ?, P> primitiveType);

/**
* Gets or creates a distributed primitive asynchronously.
* <p>
* A new primitive of the given {@code primitiveType} will be created if no primitive instance with the given
* {@code name} exists on this node, otherwise the existing instance will be returned. The name is used to reference
* a distinct instance of the primitive within the cluster. The returned primitive will share the same state with
* primitives of the same name on other nodes.
* <p>
* When the instance is initially constructed, it will be configured with any pre-existing primitive configuration
* defined in {@code atomix.conf}.
*
* @param name the primitive name
* @param primitiveType the primitive type
* @param primitiveConfig the primitive configuration
* @param <C> the primitive configuration type
* @param <P> the primitive type
* @return a future to be completed with the primitive instance
*/
<C extends PrimitiveConfig<C>, P extends SyncPrimitive> CompletableFuture<P> getPrimitiveAsync(
String name, PrimitiveType<?, C, P> primitiveType, C primitiveConfig);

/**
* Creates a new named primitive builder of the given {@code primitiveType}.
* <p>
* The primitive name must be provided when constructing the builder. The name is used to reference a distinct instance of
* the primitive within the cluster. Multiple instances of the primitive with the same name will share the same state.
* However, the instance of the primitive constructed by the returned builder will be distinct and will not share
* local memory (e.g. cache) with any other instance on this node.
* <p>
* To get an asynchronous instance of the primitive, use the {@link SyncPrimitive#async()} method:
* <pre>
* {@code
* AsyncPrimitive async = atomix.primitiveBuilder("my-primitive", MyPrimitiveType.instance()).build().async();
* }
* </pre>
*
* @param name the primitive name
* @param primitiveType the primitive type
* @param <B> the primitive builder type
* @param <P> the primitive type
* @return the primitive builder
*/
<B extends PrimitiveBuilder<B, C, P>, C extends PrimitiveConfig<C>, P extends SyncPrimitive> B primitiveBuilder(
String name,
PrimitiveType<B, C, P> primitiveType);

/**
* Returns a collection of open primitives.
*
* @return a collection of open primitives
*/
Collection<PrimitiveInfo> getPrimitives();

/**
* Returns a collection of open primitives of the given type.
*
* @param primitiveType the primitive type
* @return a collection of open primitives of the given type
*/
Collection<PrimitiveInfo> getPrimitives(PrimitiveType primitiveType);

} }
Expand Up @@ -17,6 +17,8 @@


import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.atomix.core.collection.AsyncDistributedCollection; import io.atomix.core.collection.AsyncDistributedCollection;
import io.atomix.core.collection.DistributedCollectionConfig;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.resource.PrimitiveResource; import io.atomix.primitive.resource.PrimitiveResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -35,26 +37,36 @@
/** /**
* Distributed collection resource. * Distributed collection resource.
*/ */
public abstract class DistributedCollectionResource implements PrimitiveResource { public abstract class DistributedCollectionResource<P extends AsyncDistributedCollection<String>, C extends DistributedCollectionConfig<C>> extends PrimitiveResource<P, C> {
private final Logger LOGGER = LoggerFactory.getLogger(getClass()); private final Logger LOGGER = LoggerFactory.getLogger(getClass());


private final AsyncDistributedCollection<String> collection; protected DistributedCollectionResource(PrimitiveType type) {

super(type);
public DistributedCollectionResource(AsyncDistributedCollection<String> collection) {
this.collection = collection;
} }


@GET @GET
@Path("/{name}")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public void get(@Suspended AsyncResponse response) { public void get(
response.resume(Response.ok(Sets.newHashSet(collection.iterator().sync()))); @PathParam("name") String name,
@Suspended AsyncResponse response) {
getPrimitive(name).whenComplete((collection, error) -> {
if (error == null) {
response.resume(Response.ok(Sets.newHashSet(collection.iterator().sync())));
} else {
response.resume(Response.serverError().build());
}
});
} }


@PUT @PUT
@Path("/{element}") @Path("/{name}/{element}")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public void add(@PathParam("element") String element, @Suspended AsyncResponse response) { public void add(
collection.add(element).whenComplete((result, error) -> { @PathParam("name") String name,
@PathParam("element") String element,
@Suspended AsyncResponse response) {
getPrimitive(name).thenCompose(collection -> collection.add(element)).whenComplete((result, error) -> {
if (error == null) { if (error == null) {
response.resume(Response.ok(result).build()); response.resume(Response.ok(result).build());
} else { } else {
Expand All @@ -65,10 +77,13 @@ public void add(@PathParam("element") String element, @Suspended AsyncResponse r
} }


@GET @GET
@Path("/{element}") @Path("/{name}/{element}")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public void contains(@PathParam("element") String element, @Suspended AsyncResponse response) { public void contains(
collection.contains(element).whenComplete((result, error) -> { @PathParam("name") String name,
@PathParam("element") String element,
@Suspended AsyncResponse response) {
getPrimitive(name).thenCompose(collection -> collection.contains(element)).whenComplete((result, error) -> {
if (error == null) { if (error == null) {
response.resume(Response.ok(result).build()); response.resume(Response.ok(result).build());
} else { } else {
Expand All @@ -79,10 +94,13 @@ public void contains(@PathParam("element") String element, @Suspended AsyncRespo
} }


@DELETE @DELETE
@Path("/{element}") @Path("/{name}/{element}")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public void remove(@PathParam("element") String element, @Suspended AsyncResponse response) { public void remove(
collection.remove(element).whenComplete((result, error) -> { @PathParam("name") String name,
@PathParam("element") String element,
@Suspended AsyncResponse response) {
getPrimitive(name).thenCompose(collection -> collection.remove(element)).whenComplete((result, error) -> {
if (error == null) { if (error == null) {
response.resume(Response.ok(result).build()); response.resume(Response.ok(result).build());
} else { } else {
Expand All @@ -93,10 +111,12 @@ public void remove(@PathParam("element") String element, @Suspended AsyncRespons
} }


@GET @GET
@Path("/size") @Path("/{name}/size")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public void size(@Suspended AsyncResponse response) { public void size(
collection.size().whenComplete((result, error) -> { @PathParam("name") String name,
@Suspended AsyncResponse response) {
getPrimitive(name).thenCompose(collection -> collection.size()).whenComplete((result, error) -> {
if (error == null) { if (error == null) {
response.resume(Response.ok(result).build()); response.resume(Response.ok(result).build());
} else { } else {
Expand All @@ -107,8 +127,11 @@ public void size(@Suspended AsyncResponse response) {
} }


@DELETE @DELETE
public void clear(@Suspended AsyncResponse response) { @Path("/{name}")
collection.clear().whenComplete((result, error) -> { public void clear(
@PathParam("name") String name,
@Suspended AsyncResponse response) {
getPrimitive(name).thenCompose(collection -> collection.clear()).whenComplete((result, error) -> {
if (error == null) { if (error == null) {
response.resume(Response.ok().build()); response.resume(Response.ok().build());
} else { } else {
Expand Down
Expand Up @@ -15,12 +15,11 @@
*/ */
package io.atomix.core.counter; package io.atomix.core.counter;


import io.atomix.core.counter.impl.DefaultAtomicCounterBuilder;
import io.atomix.core.counter.impl.AtomicCounterResource; import io.atomix.core.counter.impl.AtomicCounterResource;
import io.atomix.core.counter.impl.DefaultAtomicCounterBuilder;
import io.atomix.core.counter.impl.DefaultAtomicCounterService; import io.atomix.core.counter.impl.DefaultAtomicCounterService;
import io.atomix.primitive.PrimitiveManagementService; import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.PrimitiveType; import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.resource.PrimitiveResource;
import io.atomix.primitive.service.PrimitiveService; import io.atomix.primitive.service.PrimitiveService;
import io.atomix.primitive.service.ServiceConfig; import io.atomix.primitive.service.ServiceConfig;


Expand Down Expand Up @@ -53,8 +52,8 @@ public PrimitiveService newService(ServiceConfig config) {
} }


@Override @Override
public PrimitiveResource newResource(AtomicCounter primitive) { public Class<?> getResourceClass() {
return new AtomicCounterResource(primitive.async()); return AtomicCounterResource.class;
} }


@Override @Override
Expand Down

0 comments on commit 0560bfa

Please sign in to comment.