Skip to content
Permalink
Browse files
[NO ISSUE][STO] Add API to get datasets pending IO
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add a new API that gets the number of pending io (flush/merge)
  ops for all datasets on an NC.

Change-Id: I062de60e36677f138c60855ff565a0610d80c998
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7644
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
  • Loading branch information
mhubail committed Aug 24, 2020
1 parent ae092dd commit 3977ad24df8e2d3cc72c2226ec8ad6f6f58dddf6
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 8 deletions.
@@ -25,6 +25,7 @@
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -143,4 +144,9 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
* @throws HyracksDataException
*/
void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;

/**
* @return the current datasets io stats
*/
StorageIOStats getDatasetsIOStats();
}
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.context;

import static org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType.REPLICATE;

import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -38,7 +40,7 @@ public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
dsInfo.declareActiveIOOperation();
dsInfo.declareActiveIOOperation(REPLICATE);
}
}

@@ -54,7 +56,7 @@ public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOper
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
dsInfo.undeclareActiveIOOperation(REPLICATE);
}
}

@@ -28,6 +28,7 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,6 +43,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
private int numActiveIOOps;
private int pendingFlushes;
private int pendingMerges;
private long lastAccess;
private boolean isExternal;
private boolean isRegistered;
@@ -70,12 +73,32 @@ public void untouch() {
setLastAccess(System.currentTimeMillis());
}

public synchronized void declareActiveIOOperation() {
public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
numActiveIOOps++;
switch (opType) {
case FLUSH:
pendingFlushes++;
break;
case MERGE:
pendingMerges++;
break;
default:
break;
}
}

public synchronized void undeclareActiveIOOperation() {
public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
numActiveIOOps--;
switch (opType) {
case FLUSH:
pendingFlushes--;
break;
case MERGE:
pendingMerges--;
break;
default:
break;
}
//notify threads waiting on this dataset info
notifyAll();
}
@@ -204,7 +227,7 @@ public void waitForIO() throws HyracksDataException {
while (numActiveIOOps > 0) {
try {
/**
* Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
* Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType)}
*/
wait();
} catch (InterruptedException e) {
@@ -220,4 +243,12 @@ public void waitForIO() throws HyracksDataException {
}
}
}

public synchronized int getPendingFlushes() {
return pendingFlushes;
}

public synchronized int getPendingMerges() {
return pendingMerges;
}
}
@@ -38,6 +38,7 @@
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -519,6 +520,16 @@ public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDa
}
}

@Override
public StorageIOStats getDatasetsIOStats() {
StorageIOStats stats = new StorageIOStats();
for (DatasetResource dsr : datasets.values()) {
stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
}
return stats;
}

private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
@@ -259,7 +259,7 @@ public void recycled(ILSMMemoryComponent component) throws HyracksDataException

@Override
public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
dsInfo.declareActiveIOOperation();
dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +282,7 @@ public synchronized void completed(ILSMIOOperation operation) {
pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
}
}
dsInfo.undeclareActiveIOOperation();
dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
}

public synchronized boolean hasPendingFlush() {
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.common.storage;

public class StorageIOStats {

private int pendingFlushes;
private int pendingMerges;

public void addPendingFlushes(int pending) {
pendingFlushes += pending;
}

public void addPendingMerges(int pending) {
pendingMerges += pending;
}

public int getPendingFlushes() {
return pendingFlushes;
}

public int getPendingMerges() {
return pendingMerges;
}
}
@@ -37,7 +37,8 @@ enum LSMIOOperationType {
FLUSH,
MERGE,
LOAD,
NOOP
NOOP,
REPLICATE
}

/**

0 comments on commit 3977ad2

Please sign in to comment.