Skip to content

Commit

Permalink
Ensure serializable types are registered for resources prior to recei…
Browse files Browse the repository at this point in the history
…ving and deserializing resource operations.
  • Loading branch information
kuujo committed Mar 2, 2016
1 parent a0a4225 commit 0c7e749
Show file tree
Hide file tree
Showing 42 changed files with 275 additions and 165 deletions.
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.collections.state.MapCommands; import io.atomix.collections.state.MapCommands;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
Expand Down Expand Up @@ -70,11 +69,6 @@ public DistributedMap(CopycatClient client, Properties options) {
super(client, options); super(client, options);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new MapCommands.TypeResolver().resolve(registry);
}

/** /**
* Returns {@code true} if the map is empty. * Returns {@code true} if the map is empty.
* <p> * <p>
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.collections.state.MapCommands;
import io.atomix.collections.state.MapState; import io.atomix.collections.state.MapState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedMapFactory implements ResourceFactory<DistributedMap<?, ?>> { public class DistributedMapFactory implements ResourceFactory<DistributedMap<?, ?>> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new MapCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new MapState(config); return new MapState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.collections.state.MultiMapCommands; import io.atomix.collections.state.MultiMapCommands;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
Expand Down Expand Up @@ -123,11 +122,6 @@ public DistributedMultiMap(CopycatClient client, Properties options) {
super(client, options); super(client, options);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new MultiMapCommands.TypeResolver().resolve(registry);
}

@Override @Override
public Resource.Config config() { public Resource.Config config() {
return new Config(super.config()); return new Config(super.config());
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.collections.state.MultiMapCommands;
import io.atomix.collections.state.MultiMapState; import io.atomix.collections.state.MultiMapState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedMultiMapFactory implements ResourceFactory<DistributedMultiMap<?, ?>> { public class DistributedMultiMapFactory implements ResourceFactory<DistributedMultiMap<?, ?>> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new MultiMapCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new MultiMapState(config); return new MultiMapState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.collections.state.QueueCommands; import io.atomix.collections.state.QueueCommands;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
Expand Down Expand Up @@ -62,11 +61,6 @@ public DistributedQueue(CopycatClient client, Properties options) {
super(client, options); super(client, options);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new QueueCommands.TypeResolver().resolve(registry);
}

/** /**
* Adds a value to the set. * Adds a value to the set.
* *
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.collections.state.QueueCommands;
import io.atomix.collections.state.QueueState; import io.atomix.collections.state.QueueState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedQueueFactory implements ResourceFactory<DistributedQueue<?>> { public class DistributedQueueFactory implements ResourceFactory<DistributedQueue<?>> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new QueueCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new QueueState(config); return new QueueState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.collections.state.SetCommands; import io.atomix.collections.state.SetCommands;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
Expand Down Expand Up @@ -58,11 +57,6 @@ public DistributedSet(CopycatClient client, Properties options) {
super(client, options); super(client, options);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new SetCommands.TypeResolver().resolve(registry);
}

/** /**
* Adds a value to the set. * Adds a value to the set.
* *
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.collections; package io.atomix.collections;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.collections.state.SetCommands;
import io.atomix.collections.state.SetState; import io.atomix.collections.state.SetState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedSetFactory implements ResourceFactory<DistributedSet<?>> { public class DistributedSetFactory implements ResourceFactory<DistributedSet<?>> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new SetCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new SetState(config); return new SetState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections.state; package io.atomix.collections.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.concurrent.Scheduled; import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
Expand All @@ -36,11 +35,6 @@ public MapState(Properties config) {
super(config); super(config);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new MapCommands.TypeResolver().resolve(registry);
}

/** /**
* Handles a contains key commit. * Handles a contains key commit.
*/ */
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections.state; package io.atomix.collections.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.concurrent.Scheduled; import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.collections.DistributedMultiMap; import io.atomix.collections.DistributedMultiMap;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
Expand All @@ -39,11 +38,6 @@ public MultiMapState(Properties properties) {
this.order = DistributedMultiMap.Order.valueOf(config.getProperty("order", DistributedMultiMap.Order.INSERT.name().toLowerCase()).toUpperCase()); this.order = DistributedMultiMap.Order.valueOf(config.getProperty("order", DistributedMultiMap.Order.INSERT.name().toLowerCase()).toUpperCase());
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new MultiMapCommands.TypeResolver().resolve(registry);
}

/** /**
* Creates a new value map. * Creates a new value map.
*/ */
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections.state; package io.atomix.collections.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
import io.atomix.resource.ResourceStateMachine; import io.atomix.resource.ResourceStateMachine;


Expand All @@ -36,11 +35,6 @@ public QueueState(Properties properties) {
super(properties); super(properties);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new QueueCommands.TypeResolver().resolve(registry);
}

/** /**
* Handles a contains commit. * Handles a contains commit.
*/ */
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.collections.state; package io.atomix.collections.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.concurrent.Scheduled; import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
import io.atomix.resource.ResourceStateMachine; import io.atomix.resource.ResourceStateMachine;
Expand All @@ -38,11 +37,6 @@ public SetState(Properties properties) {
super(properties); super(properties);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new SetCommands.TypeResolver().resolve(registry);
}

/** /**
* Handles a contains commit. * Handles a contains commit.
*/ */
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.coordination; package io.atomix.coordination;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Server; import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.util.ConfigurationException; import io.atomix.catalyst.util.ConfigurationException;
Expand Down Expand Up @@ -319,11 +318,6 @@ public DistributedGroup(CopycatClient client, Properties options) {
this.connections = new GroupConnectionManager(client.transport().client(), client.context()); this.connections = new GroupConnectionManager(client.transport().client(), client.context());
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new GroupCommands.TypeResolver().resolve(registry);
}

@Override @Override
public Options options() { public Options options() {
return new Options(super.options()); return new Options(super.options());
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.coordination; package io.atomix.coordination;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.coordination.state.GroupCommands;
import io.atomix.coordination.state.GroupState; import io.atomix.coordination.state.GroupState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedGroupFactory implements ResourceFactory<DistributedGroup> { public class DistributedGroupFactory implements ResourceFactory<DistributedGroup> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new GroupCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new GroupState(config); return new GroupState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.coordination; package io.atomix.coordination;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.coordination.state.LockCommands; import io.atomix.coordination.state.LockCommands;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource; import io.atomix.resource.AbstractResource;
Expand Down Expand Up @@ -144,11 +143,6 @@ public DistributedLock(CopycatClient client, Properties options) {
super(client, options); super(client, options);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new LockCommands.TypeResolver().resolve(registry);
}

@Override @Override
public CompletableFuture<DistributedLock> open() { public CompletableFuture<DistributedLock> open() {
return super.open().thenApply(result -> { return super.open().thenApply(result -> {
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/ */
package io.atomix.coordination; package io.atomix.coordination;


import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.coordination.state.LockCommands;
import io.atomix.coordination.state.LockState; import io.atomix.coordination.state.LockState;
import io.atomix.copycat.client.CopycatClient; import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory; import io.atomix.resource.ResourceFactory;
Expand All @@ -29,6 +31,11 @@
*/ */
public class DistributedLockFactory implements ResourceFactory<DistributedLock> { public class DistributedLockFactory implements ResourceFactory<DistributedLock> {


@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new LockCommands.TypeResolver();
}

@Override @Override
public ResourceStateMachine createStateMachine(Properties config) { public ResourceStateMachine createStateMachine(Properties config) {
return new LockState(config); return new LockState(config);
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.coordination.state; package io.atomix.coordination.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Address;
import io.atomix.coordination.GroupMemberInfo; import io.atomix.coordination.GroupMemberInfo;
import io.atomix.coordination.GroupTask; import io.atomix.coordination.GroupTask;
Expand Down Expand Up @@ -45,11 +44,6 @@ public GroupState(Properties config) {
super(config); super(config);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new GroupCommands.TypeResolver().resolve(registry);
}

@Override @Override
public void close(ServerSession session) { public void close(ServerSession session) {
Map<Long, Member> left = new HashMap<>(); Map<Long, Member> left = new HashMap<>();
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package io.atomix.coordination.state; package io.atomix.coordination.state;


import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.concurrent.Scheduled; import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession; import io.atomix.copycat.server.session.ServerSession;
Expand All @@ -39,11 +38,6 @@ public LockState(Properties config) {
super(config); super(config);
} }


@Override
protected void registerTypes(SerializerRegistry registry) {
new LockCommands.TypeResolver().resolve(registry);
}

@Override @Override
public void close(ServerSession session) { public void close(ServerSession session) {
if (lock != null && lock.session().id() == session.id()) { if (lock != null && lock.session().id() == session.id()) {
Expand Down

0 comments on commit 0c7e749

Please sign in to comment.