Skip to content

Commit

Permalink
Ensure partitioned clusters are exposed to discrete resources interna…
Browse files Browse the repository at this point in the history
…lly.
  • Loading branch information
kuujo committed Apr 27, 2015
1 parent daffd61 commit abad202
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Expand Up @@ -27,7 +27,7 @@
*/ */
public abstract class AbstractResource<T extends Resource<?>> implements Resource<T> { public abstract class AbstractResource<T extends Resource<?>> implements Resource<T> {
protected final String name; protected final String name;
protected final ManagedCluster cluster; private final ManagedCluster cluster;


protected AbstractResource(ResourceConfig config) { protected AbstractResource(ResourceConfig config) {
this.name = config.getName(); this.name = config.getName();
Expand Down
Expand Up @@ -30,7 +30,7 @@
*/ */
public abstract class DiscreteResource<T extends DiscreteResource<?, U>, U extends Resource<?>> extends AbstractResource<U> { public abstract class DiscreteResource<T extends DiscreteResource<?, U>, U extends Resource<?>> extends AbstractResource<U> {
protected final Protocol protocol; protected final Protocol protocol;
protected final Cluster partitionedCluster; protected final Cluster cluster;
protected final ReplicationStrategy replicationStrategy; protected final ReplicationStrategy replicationStrategy;
protected final Serializer serializer; protected final Serializer serializer;


Expand All @@ -39,10 +39,10 @@ protected DiscreteResource(DiscreteResourceConfig config) {
this.protocol = config.getProtocol(); this.protocol = config.getProtocol();
this.replicationStrategy = config.getReplicationStrategy(); this.replicationStrategy = config.getReplicationStrategy();
this.serializer = config.getSerializer(); this.serializer = config.getSerializer();
this.partitionedCluster = new PartitionedCluster(config.getCluster(), config.getReplicationStrategy(), config.getPartitionId(), config.getPartitions()); this.cluster = new PartitionedCluster(config.getCluster(), config.getReplicationStrategy(), config.getPartitionId(), config.getPartitions());
protocol.setTopic(String.format("%s-%d", config.getName(), config.getPartitionId())); protocol.setTopic(String.format("%s-%d", config.getName(), config.getPartitionId()));
protocol.setCluster(partitionedCluster); protocol.setCluster(cluster);
protocol.setContext(new ExecutionContext(String.format("copycat-protocol-%s-%d-%d", config.getName(), config.getPartitionId(), partitionedCluster.member().id()))); protocol.setContext(new ExecutionContext(String.format("copycat-protocol-%s-%d-%d", config.getName(), config.getPartitionId(), cluster.member().id())));
} }


/** /**
Expand All @@ -52,7 +52,7 @@ protected DiscreteResource(DiscreteResourceConfig config) {


@Override @Override
public Cluster cluster() { public Cluster cluster() {
return partitionedCluster; return cluster;
} }


@Override @Override
Expand Down

0 comments on commit abad202

Please sign in to comment.