Skip to content

Commit

Permalink
[NO ISSUE][CONF] Removed unused active partitions config
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Remove unused active partitions config.
- Improved node partitions logging.
- Logging fixes.

Change-Id: I4f2a611e2846405738401310f485db0e72844031
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15024
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Jan 28, 2022
1 parent 9abc3a8 commit 5f52a5f
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 43 deletions.
Expand Up @@ -224,7 +224,7 @@ public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptio
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions = metadataProperties.getNodeActivePartitions(nodeId);
final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
replicaManager = new ReplicaManager(this, nodePartitions);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.asterix.app.nc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -31,7 +30,6 @@

import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
Expand Down Expand Up @@ -188,11 +186,8 @@ private boolean isSelf(ReplicaIdentifier id) {
}

private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
ClusterPartition[] clusterPartitions =
appCtx.getMetadataProperties().getNodePartitions().get(appCtx.getServiceContext().getNodeId());
if (clusterPartitions != null) {
nodeOwnedPartitions.addAll(Arrays.stream(clusterPartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toList()));
}
Set<Integer> nodePartitions =
appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId());
nodeOwnedPartitions.addAll(nodePartitions);
}
}
Expand Up @@ -36,7 +36,7 @@ public class UpdateNodeStatusTask implements INCLifecycleTask {
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 2L;
private final NodeStatus status;
private Set<Integer> activePartitions;
private final Set<Integer> activePartitions;

public UpdateNodeStatusTask(NodeStatus status, Set<Integer> activePartitions) {
this.status = status;
Expand All @@ -61,6 +61,6 @@ private void updateNodeActivePartitions(IControllerService cs) {

@Override
public String toString() {
return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
return "UpdateNodeStatusTask{" + "status=" + status + ", activePartitions=" + activePartitions + '}';
}
}
Expand Up @@ -168,8 +168,9 @@ private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataExcepti

protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
LOGGER.info(
"Building registration tasks for node {} with status {} and system state: {} and active partitions {}",
nodeId, nodeStatus, state, activePartitions);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
switch (nodeStatus) {
case ACTIVE:
Expand Down
Expand Up @@ -76,4 +76,10 @@ public MessageType getType() {
public Set<Integer> getActivePartitions() {
return activePartitions;
}

@Override
public String toString() {
return "NCLifecycleTaskReportMessage{" + "nodeId='" + nodeId + '\'' + ", success=" + success + ", exception="
+ exception + ", localCounters=" + localCounters + ", activePartitions=" + activePartitions + '}';
}
}
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.asterix.app.replication.message;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

Expand All @@ -39,18 +38,18 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc

private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 2L;
protected final SystemState state;
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
protected final Set<Integer> activePartitions;
private final SystemState state;
private final String nodeId;
private final NodeStatus nodeStatus;
private final Map<String, Object> secrets;
private final Set<Integer> activePartitions;

public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
this.secrets = secretsEphemeral;
this.activePartitions = activePartitions;
}

Expand All @@ -59,6 +58,7 @@ public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStat
try {
RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
systemState, secretsEphemeral, activePartitions);
LOGGER.info("sending {} to CC", msg);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
Expand Down Expand Up @@ -95,4 +95,10 @@ public Map<String, Object> getSecrets() {
public Set<Integer> getActivePartitions() {
return activePartitions;
}

@Override
public String toString() {
return "RegistrationTasksRequestMessage{" + "state=" + state + ", nodeId='" + nodeId + '\'' + ", nodeStatus="
+ nodeStatus + ", activePartitions=" + activePartitions + '}';
}
}
Expand Up @@ -59,13 +59,9 @@ public void handle(INcApplicationContext appCtx) throws HyracksDataException, In
Throwable exception = null;
try {
for (INCLifecycleTask task : tasks) {
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Starting startup task: " + task);
}
LOGGER.log(Level.INFO, "Starting startup task: {}", task);
task.perform(getCcId(), cs);
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Completed startup task: " + task);
}
LOGGER.log(Level.INFO, "Completed startup task: {}", task);
}
} catch (Throwable e) { //NOSONAR all startup failures should be reported to CC
LOGGER.log(Level.ERROR, "Failed during startup task", e);
Expand Down
Expand Up @@ -300,12 +300,12 @@ public synchronized void tasksCompleted(CcId ccId) throws Exception {
final NodeStatus currentStatus = ncs.getNodeStatus();
final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
? getCurrentSystemState() : SystemState.HEALTHY;
final Map httpSecrets =
final Map<String, Object> httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
currentStatus, systemState, httpSecrets,
runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId));
runtimeContext.getMetadataProperties().getNodePartitions(nodeId));
}

@Override
Expand Down
Expand Up @@ -124,8 +124,8 @@ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return accessor.getClusterPartitions();
}

public Set<Integer> getNodeActivePartitions(String nodeId) {
return accessor.getActivePartitions(nodeId);
public Set<Integer> getNodePartitions(String nodeId) {
return accessor.getNodePartitions(nodeId);
}

public Map<String, String> getTransactionLogDirs() {
Expand Down
Expand Up @@ -46,8 +46,7 @@ public enum Option implements IOption {
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
"The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active partitions");
"The first partition id to assign to iodevices on this node (-1 == auto-assign)");

private final IOptionType type;
private final Object defaultValue;
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.PrintUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.config.IApplicationConfig;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class PropertiesAccessor implements IApplicationConfig {
/**
* Constructor which wraps an IApplicationConfig.
*/
private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException, IOException {
private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException {
this.cfg = cfg;
nodePartitionsMap = new ConcurrentHashMap<>();
clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>());
Expand All @@ -80,6 +81,7 @@ private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException, IOEx
for (String ncName : cfg.getNCNames()) {
configureNc(configManager, ncName, uniquePartitionId);
}
LOGGER.info("configured partitions: {} from config {}", () -> PrintUtil.toString(nodePartitionsMap), () -> cfg);
for (String section : cfg.getSectionNames()) {
if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION, section);
Expand Down Expand Up @@ -194,22 +196,16 @@ public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return clusterPartitions;
}

public Set<Integer> getActivePartitions(String nodeId) {
// by default, node actives partitions are the partitions assigned to the node
String[] activePartitions = cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS);
if (activePartitions == null) {
ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toSet());
}
return Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet());
public Set<Integer> getNodePartitions(String nodeId) {
ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
}

public List<AsterixExtension> getExtensions() {
return extensions;
}

public static PropertiesAccessor getInstance(IApplicationConfig cfg) throws IOException, AsterixException {
public static PropertiesAccessor getInstance(IApplicationConfig cfg) throws AsterixException {
PropertiesAccessor accessor = instances.get(cfg);
if (accessor == null) {
accessor = new PropertiesAccessor(cfg);
Expand Down
Expand Up @@ -59,4 +59,10 @@ public long getMaxTxnId() {
public long getMaxJobId() {
return maxJobId;
}

@Override
public String toString() {
return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId="
+ maxJobId + '}';
}
}

0 comments on commit 5f52a5f

Please sign in to comment.