Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Simplify and rename Actorsystem configuration properties to be more g…
Browse files Browse the repository at this point in the history
…eneric, e.g. starting with "usergrid.cluster" instead of "collection.akka"
  • Loading branch information
snoopdave committed Jul 1, 2016
1 parent 2d5ad05 commit f0c9fd4
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 124 deletions.
56 changes: 20 additions & 36 deletions stack/config/src/main/resources/usergrid-default.properties
Expand Up @@ -410,60 +410,44 @@ usergrid.queue.lock.timeout=5
#usergrid.queue.publish.queuesize=850000 #usergrid.queue.publish.queuesize=850000




######################### Akka Actor System Configiuration ################### ######################### Usergrid Cluster Configuration ###################
# #
# Usergrid includes Akka, an Actor-based system that allows for the # Usergrid includes a multi-region clustering system.
# distribution of work across multiple Usergrid instances and multiple regions. # To user it you must specify your region, the list of regions and seeds for each region.
#
# All properties are required. If Akka is enabled then all properties in this
# section MUST be specified.
#
# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
# #


# Currently, Akka is disable and not required for Usergrid # This is an experimentation new feature, disabled by default
collection.akka.enabled=false usergrid.cluster.enabled=false


# host name of this machine # Comma-separated list of regions to be considered
collection.akka.hostname=localhost usergrid.cluster.region.list=default


# The region of this Usergrid installation # The regions of this local instance of Usergrid
# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property usergrid.cluster.region.local=default
collection.akka.region=


# Comma-separated lists of Akka seeds each with format {region}:{hostname}:{port}. # Comma-separated lists of cluster seeds each with format {region}:{hostname}
# All regions MUST be listed in the 'usergrid.queue.regionList' usergrid.cluster.seeds=default:localhost
collection.akka.region.seeds=

# The default authoritative region for when is not specified elsewhere
# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property
collection.akka.authoritative.region=


# Default number of Akka actors to start per instance / router producer # Port used for cluster communications.
collection.akka.instances-per-node=300 usergrid.cluster.port=2551




######################### Usergrid Unique Values Validation ################## ######################### Usergrid Unique Values Validation ##################
# #
# Usergrid includes a distributed unique values validation that ensure that # These only apply if the above Usergrid cluster system is enabled.
# unique values rename unique across a distributed and multi-region system.
# This system is based on the Akka actor system and requires some additional
# configuration.
#
# The system uses consistent hashing to ensure that one single-threaded actor
# ever accesses a unique value record at one time.
#
# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
# #


# The number of unique value actors to start on each Usergrid instance. # The number of unique value actors to start on each Usergrid instance.
collection.akka.uniquevalue.actors=300 collection.uniquevalues.actors=300


# TTL of unique value reservation in in-memory cache # TTL of unique value reservation in in-memory cache
collection.akka.uniquevalue.cache.ttl=10 collection.uniquevalues.cache.ttl=10


# TTL of a unique value reservation when written to Cassandra # TTL of a unique value reservation when written to Cassandra
collection.akka.uniquevalue.reservation.ttl=10 collection.uniquevalues.reservation.ttl=10

# The default authoritative region for when is not specified elsewhere
collection.uniquevalues.authoritative.region=default




############################## Usergrid Scheduler ########################### ############################## Usergrid Scheduler ###########################
Expand Down
Expand Up @@ -142,7 +142,7 @@ public CpEntityManagerFactory(


logger.info("EntityManagerFactoring starting..."); logger.info("EntityManagerFactoring starting...");


if ( actorSystemFig.getAkkaEnabled() ) { if ( actorSystemFig.getEnabled() ) {
try { try {
logger.info("Akka cluster starting..."); logger.info("Akka cluster starting...");


Expand Down
Expand Up @@ -30,64 +30,54 @@
@FigSingleton @FigSingleton
public interface ActorSystemFig extends GuicyFig, Serializable { public interface ActorSystemFig extends GuicyFig, Serializable {


String AKKA_ENABLED = "collection.akka.enabled"; String CLUSTER_ENABLED = "usergrid.cluster.enabled";


String AKKA_HOSTNAME = "collection.akka.hostname"; String CLUSTER_REGIONS_LIST = "usergrid.cluster.region.list";


String AKKA_REGION = "collection.akka.region"; String CLUSTER_REGIONS_LOCAL = "usergrid.cluster.region.local";


String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues String CLUSTER_SEEDS = "usergrid.cluster.seeds";


String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; String CLUSTER_PORT = "usergrid.cluster.port";

String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region";

String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node";




/** /**
* Use Akka or nah * Use Cluster or nah
*/ */
@Key(AKKA_ENABLED) @Key(CLUSTER_ENABLED)
@Default("true") @Default("true")
boolean getAkkaEnabled(); boolean getEnabled();

/**
* Hostname to be used in Akka configuration.
*/
@Key(AKKA_HOSTNAME)
String getHostname();


/** /**
* Local region to be used in Akka configuration. * Local region to be used in Akka configuration.
*/ */
@Key(AKKA_REGION) @Key(CLUSTER_REGIONS_LOCAL)
String getRegion(); @Default("default")
String getRegionLocal();


/** /**
* Comma separated list of regions known to cluster. * Comma separated list of regions known to cluster.
*/ */
@Key(AKKA_REGION_LIST) @Key(CLUSTER_REGIONS_LIST)
String getRegionList(); @Default("default")
String getRegionsList();


/** /**
* Comma-separated lists of seeds each with format {region}:{hostname}:{port}. * Comma-separated lists of seeds each with format {region}:{hostname}
* Regions MUST be listed in the 'usergrid.queue.regionList'
*/ */
@Key(AKKA_REGION_SEEDS) @Key(CLUSTER_SEEDS)
String getRegionSeeds(); @Default("default:localhost")
String getSeeds();


/** /**
* If no region specified for type, use the authoritative region * Port for cluster comms.
*/ */
@Key(AKKA_AUTHORITATIVE_REGION) @Key(CLUSTER_PORT)
String getAkkaAuthoritativeRegion(); @Default("2551")
String getPort();




/** @Key("usergrid.cluster.hostname")
* Number of actor instances to create on each node for each router. @Default("")
*/ String getHostname();
@Key(AKKA_INSTANCES_PER_NODE)
@Default("300")
int getInstancesPerNode();
} }
Expand Up @@ -39,6 +39,8 @@
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;


import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


Expand Down Expand Up @@ -77,8 +79,17 @@ public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) {
@Override @Override
public void start() { public void start() {


this.hostname = actorSystemFig.getHostname(); if ( !StringUtils.isEmpty( actorSystemFig.getHostname()) ) {
this.currentRegion = actorSystemFig.getRegion(); this.hostname = actorSystemFig.getHostname();
} else {
try {
this.hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.error("Cannot get hostname, defaulting to 'localhost': " + e.getMessage());
}
}

this.currentRegion = actorSystemFig.getRegionLocal();
this.port = null; this.port = null;


initAkka(); initAkka();
Expand Down Expand Up @@ -155,32 +166,22 @@ private void initAkka() {


// Create one actor system with request actor for each region // Create one actor system with request actor for each region


if ( StringUtils.isEmpty( hostname )) {
throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_HOSTNAME );
}

if ( StringUtils.isEmpty( currentRegion )) { if ( StringUtils.isEmpty( currentRegion )) {
throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION ); throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL );
} }


if ( StringUtils.isEmpty( actorSystemFig.getRegionList() )) { if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) {
throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_LIST ); throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST );
} }


if ( StringUtils.isEmpty( actorSystemFig.getRegionSeeds() )) { if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) {
throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_SEEDS); throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS );
} }


if ( StringUtils.isEmpty( actorSystemFig.getAkkaAuthoritativeRegion() )) { List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",") );
logger.warn("No value for {} specified, will use current region as authoriative region",
ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
//throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
}

List regionList = Arrays.asList( actorSystemFig.getRegionList().toLowerCase().split(",") );


logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}",
hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() ); hostname, currentRegion, regionList, actorSystemFig.getSeeds() );


Config config = readClusterSystemConfig(); Config config = readClusterSystemConfig();


Expand All @@ -205,7 +206,7 @@ private ListMultimap<String, String> getSeedsByRegion() {


seedsByRegion = ArrayListMultimap.create(); seedsByRegion = ArrayListMultimap.create();


String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," ); String[] regionSeeds = actorSystemFig.getSeeds().split( "," );


logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds ); logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );


Expand All @@ -226,7 +227,8 @@ private ListMultimap<String, String> getSeedsByRegion() {
String[] parts = regionSeed.split( ":" ); String[] parts = regionSeed.split( ":" );
String region = parts[0]; String region = parts[0];
String hostname = parts[1]; String hostname = parts[1];
String regionPortString = parts[2];
String regionPortString = parts.length > 2 ? parts[2] : actorSystemFig.getPort();


// all seeds in same region must use same port // all seeds in same region must use same port
// we assume 0th seed has the right port // we assume 0th seed has the right port
Expand Down Expand Up @@ -269,19 +271,16 @@ private Config readClusterSystemConfig() {


try { try {


int numInstancesPerNode = actorSystemFig.getInstancesPerNode(); int numInstancesPerNode = 300; // expect this to be overridden by RouterProducers


String region = currentRegion; String region = currentRegion;


List<String> seeds = getSeedsByRegion().get( region ); List<String> seeds = getSeedsByRegion().get( region );
int lastColon = seeds.get(0).lastIndexOf(":") + 1; int lastColon = seeds.get(0).lastIndexOf(":") + 1;
final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon ));


logger.info( "Akka Config for region {} is:\n" + logger.info( "Akka Config for region {} is:\n" + " Hostname {}\n" + " Seeds {}\n",
" Hostname {}\n" + region, hostname, seeds );
" Seeds {}\n" +
" Authoritative Region {}\n",
region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );


Map<String, Object> configMap = new HashMap<String, Object>() {{ Map<String, Object> configMap = new HashMap<String, Object>() {{


Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class ); private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class );


ActorSystemFig actorSystemFig; ActorSystemFig actorSystemFig;
UniqueValuesFig uniqueValuesFig;
UniqueValuesService akkaUvService; UniqueValuesService akkaUvService;


@Inject @Inject
Expand All @@ -82,6 +84,7 @@ public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
final MvccEntitySerializationStrategy entryStrat, final MvccEntitySerializationStrategy entryStrat,
final UniqueValueSerializationStrategy uniqueValueStrat, final UniqueValueSerializationStrategy uniqueValueStrat,
final ActorSystemFig actorSystemFig, final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
final UniqueValuesService akkaUvService ) { final UniqueValuesService akkaUvService ) {


Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" ); Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
Expand All @@ -92,6 +95,7 @@ public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
this.entityStrat = entryStrat; this.entityStrat = entryStrat;
this.uniqueValueStrat = uniqueValueStrat; this.uniqueValueStrat = uniqueValueStrat;
this.actorSystemFig = actorSystemFig; this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService; this.akkaUvService = akkaUvService;
} }


Expand Down Expand Up @@ -130,13 +134,13 @@ private CollectionIoEvent<MvccEntity> confirmUniqueFields(CollectionIoEvent<Mvcc
logMutation.mergeShallow( entityMutation ); logMutation.mergeShallow( entityMutation );


// akkaFig may be null when this is called from JUnit tests // akkaFig may be null when this is called from JUnit tests
if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) { if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
String region = ioEvent.getRegion(); String region = ioEvent.getRegion();
if ( region == null ) { if ( region == null ) {
region = actorSystemFig.getAkkaAuthoritativeRegion(); region = uniqueValuesFig.getAuthoritativeRegion();
} }
if ( region == null ) { if ( region == null ) {
region = actorSystemFig.getRegion(); region = actorSystemFig.getRegionLocal();
} }
confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region ); confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
} else { } else {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope;
Expand All @@ -63,6 +64,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class ); private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class );


ActorSystemFig actorSystemFig; ActorSystemFig actorSystemFig;
UniqueValuesFig uniqueValuesFig;
UniqueValuesService akkaUvService; UniqueValuesService akkaUvService;


private final UniqueValueSerializationStrategy uniqueValueStrat; private final UniqueValueSerializationStrategy uniqueValueStrat;
Expand All @@ -83,11 +85,13 @@ public WriteUniqueVerify(final UniqueValueSerializationStrategy uniqueValueSeria
final Keyspace keyspace, final Keyspace keyspace,
final CassandraConfig cassandraFig, final CassandraConfig cassandraFig,
final ActorSystemFig actorSystemFig, final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
final UniqueValuesService akkaUvService ) { final UniqueValuesService akkaUvService ) {


this.keyspace = keyspace; this.keyspace = keyspace;
this.cassandraFig = cassandraFig; this.cassandraFig = cassandraFig;
this.actorSystemFig = actorSystemFig; this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService; this.akkaUvService = akkaUvService;


Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" ); Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
Expand All @@ -102,7 +106,7 @@ public WriteUniqueVerify(final UniqueValueSerializationStrategy uniqueValueSeria


@Override @Override
public void call( final CollectionIoEvent<MvccEntity> ioevent ) { public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) { if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
verifyUniqueFieldsAkka( ioevent ); verifyUniqueFieldsAkka( ioevent );
} else { } else {
verifyUniqueFields( ioevent ); verifyUniqueFields( ioevent );
Expand All @@ -121,10 +125,10 @@ private void verifyUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioevent) {


String region = ioevent.getRegion(); String region = ioevent.getRegion();
if ( region == null ) { if ( region == null ) {
region = actorSystemFig.getAkkaAuthoritativeRegion(); region = uniqueValuesFig.getAuthoritativeRegion();
} }
if ( region == null ) { if ( region == null ) {
region = actorSystemFig.getRegion(); region = actorSystemFig.getRegionLocal();
} }
try { try {
akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region ); akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region );
Expand Down

0 comments on commit f0c9fd4

Please sign in to comment.