Skip to content

Commit

Permalink
Refactor resource API to allow resources to be used independently of …
Browse files Browse the repository at this point in the history
…the resource manager.
  • Loading branch information
kuujo committed Oct 18, 2015
1 parent dbe4e49 commit 7f6e679
Show file tree
Hide file tree
Showing 89 changed files with 2,037 additions and 2,122 deletions.
8 changes: 7 additions & 1 deletion atomic/pom.xml
Expand Up @@ -29,8 +29,14 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>io.atomix</groupId> <groupId>io.atomix</groupId>
<artifactId>atomix</artifactId> <artifactId>atomix-resource</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>catalyst-local</artifactId>
<version>${catalyst.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
Expand Up @@ -15,7 +15,8 @@
*/ */
package io.atomix.atomic; package io.atomix.atomic;


import io.atomix.Consistency; import io.atomix.copycat.client.RaftClient;
import io.atomix.resource.Consistency;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.function.Function;
Expand All @@ -28,6 +29,10 @@
public class DistributedAtomicLong extends DistributedAtomicValue<Long> { public class DistributedAtomicLong extends DistributedAtomicValue<Long> {
private Long value; private Long value;


public DistributedAtomicLong(RaftClient client) {
super(client);
}

@Override @Override
public DistributedAtomicLong with(Consistency consistency) { public DistributedAtomicLong with(Consistency consistency) {
super.with(consistency); super.with(consistency);
Expand Down
28 changes: 15 additions & 13 deletions atomic/src/main/java/io/atomix/atomic/DistributedAtomicValue.java
Expand Up @@ -15,12 +15,13 @@
*/ */
package io.atomix.atomic; package io.atomix.atomic;


import io.atomix.DistributedResource;
import io.atomix.atomic.state.AtomicValueCommands; import io.atomix.atomic.state.AtomicValueCommands;
import io.atomix.atomic.state.AtomicValueState; import io.atomix.atomic.state.AtomicValueState;
import io.atomix.catalyst.util.Listener; import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.server.StateMachine; import io.atomix.copycat.client.RaftClient;
import io.atomix.resource.ResourceContext; import io.atomix.resource.AbstractResource;
import io.atomix.resource.Consistency;
import io.atomix.resource.ResourceInfo;


import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
Expand All @@ -33,24 +34,25 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class DistributedAtomicValue<T> extends DistributedResource<DistributedAtomicValue<T>> { @ResourceInfo(stateMachine=AtomicValueState.class)
public class DistributedAtomicValue<T> extends AbstractResource {
private final java.util.Set<Consumer<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final java.util.Set<Consumer<T>> changeListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());


@Override public DistributedAtomicValue(RaftClient client) {
protected Class<? extends StateMachine> stateMachine() { super(client);
return AtomicValueState.class; client.session().<T>onEvent("change", event -> {
}

@Override
protected void open(ResourceContext context) {
super.open(context);
context.session().<T>onEvent("change", event -> {
for (Consumer<T> listener : changeListeners) { for (Consumer<T> listener : changeListeners) {
listener.accept(event); listener.accept(event);
} }
}); });
} }


@Override
public DistributedAtomicValue<T> with(Consistency consistency) {
super.with(consistency);
return this;
}

/** /**
* Gets the current value. * Gets the current value.
* *
Expand Down
17 changes: 15 additions & 2 deletions atomic/src/main/java/io/atomix/atomic/state/AtomicValueState.java
Expand Up @@ -18,8 +18,8 @@
import io.atomix.catalyst.util.concurrent.Scheduled; import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.copycat.client.session.Session; import io.atomix.copycat.client.session.Session;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.StateMachine;
import io.atomix.copycat.server.StateMachineExecutor; import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.resource.ResourceStateMachine;


import java.time.Duration; import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -31,7 +31,7 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class AtomicValueState extends StateMachine { public class AtomicValueState extends ResourceStateMachine {
private final Map<Session, Commit<AtomicValueCommands.Listen>> listeners = new HashMap<>(); private final Map<Session, Commit<AtomicValueCommands.Listen>> listeners = new HashMap<>();
private Object value; private Object value;
private Commit<? extends AtomicValueCommands.ValueCommand> current; private Commit<? extends AtomicValueCommands.ValueCommand> current;
Expand Down Expand Up @@ -155,4 +155,17 @@ protected Object getAndSet(Commit<AtomicValueCommands.GetAndSet> commit) {
return result; return result;
} }


@Override
public void delete() {
if (current != null) {
current.clean();
current = null;
value = null;
}
if (timer != null) {
timer.cancel();
timer = null;
}
}

} }
176 changes: 176 additions & 0 deletions atomic/src/test/java/io/atomix/atomic/AbstractAtomicTest.java
@@ -0,0 +1,176 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.atomic;

import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.LocalServerRegistry;
import io.atomix.catalyst.transport.LocalTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.RaftClient;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.RaftServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.resource.ResourceStateMachine;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;

/**
* Abstract atomix tests.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public abstract class AbstractAtomicTest extends ConcurrentTestCase {
private static final File directory = new File("target/test-logs");
protected LocalServerRegistry registry;
protected int port;
protected List<Address> members;
protected List<RaftClient> clients = new ArrayList<>();
protected List<RaftServer> servers = new ArrayList<>();

/**
* Creates a new resource state machine.
*
* @return A new resource state machine.
*/
protected abstract ResourceStateMachine createStateMachine();

/**
* Returns the next server address.
*
* @return The next server address.
*/
private Address nextAddress() {
Address address = new Address("localhost", port++);
members.add(address);
return address;
}

/**
* Creates a set of Raft servers.
*/
protected List<RaftServer> createServers(int nodes) throws Throwable {
List<RaftServer> servers = new ArrayList<>();

List<Address> members = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
members.add(nextAddress());
}

for (int i = 0; i < nodes; i++) {
RaftServer server = createServer(members.get(i));
server.open().thenRun(this::resume);
servers.add(server);
}

await(0, nodes);

return servers;
}

/**
* Creates a set of Raft servers.
*/
protected List<RaftServer> createServers(int live, int total) throws Throwable {
List<RaftServer> servers = new ArrayList<>();

List<Address> members = new ArrayList<>();
for (int i = 0; i < total; i++) {
members.add(nextAddress());
}

for (int i = 0; i < live; i++) {
RaftServer server = createServer(members.get(i));
server.open().thenRun(this::resume);
servers.add(server);
}

await(0, live);

return servers;
}

/**
* Creates a Raft server.
*/
protected RaftServer createServer(Address address) {
RaftServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(new Storage(StorageLevel.MEMORY))
.withStateMachine(createStateMachine())
.build();
servers.add(server);
return server;
}

/**
* Creates a Copycat client.
*/
protected RaftClient createClient() throws Throwable {
RaftClient client = CopycatClient.builder(members).withTransport(new LocalTransport(registry)).build();
client.open().thenRun(this::resume);
await();
clients.add(client);
return client;
}

@BeforeMethod
@AfterMethod
public void clearTests() throws Exception {
deleteDirectory(directory);
registry = new LocalServerRegistry();
members = new ArrayList<>();
port = 5000;

if (!clients.isEmpty()) {
clients.forEach(c -> c.close().join());
}

if (!servers.isEmpty()) {
servers.forEach(s -> s.close().join());
}

clients = new ArrayList<>();
servers = new ArrayList<>();
}

/**
* Deletes a directory recursively.
*/
private void deleteDirectory(File directory) throws IOException {
if (directory.exists()) {
File[] files = directory.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
} else {
Files.delete(file.toPath());
}
}
}
Files.delete(directory.toPath());
}
}

}

0 comments on commit 7f6e679

Please sign in to comment.