Skip to content

Commit

Permalink
0004391: Rate staging performance
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 6, 2020
1 parent e28c21e commit d40f2e8
Show file tree
Hide file tree
Showing 3 changed files with 369 additions and 0 deletions.
@@ -0,0 +1,219 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.stage;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.RandomStringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.util.AppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StagingPerf {

protected final static String STAGE_PATH = "test";

protected final static String STAT_LOCK_ACQUIRE = "Acquire Lock";

protected final static String STAT_BATCH_CREATE = "Create Batch File";

protected final static String STAT_BATCH_WRITE = "Write Batch File";

protected final static String STAT_BATCH_RENAME = "Rename Batch File";

protected final static String STAT_BATCH_FIND = "Find Batch File";

protected final static String STAT_BATCH_READ = "Read Batch File";

protected Logger log = LoggerFactory.getLogger(getClass());

protected IStagingManager stagingMgr;

protected StagingPerfListener listener;

protected String serverInfo;

public StagingPerf(IStagingManager stagingMgr, StagingPerfListener listener) {
this.stagingMgr = stagingMgr;
this.listener = listener;
serverInfo = String.format("Server: '%s' Host: '%s' IP: '%s'", getClass().getName(), AppUtils.getHostName(), AppUtils.getIpAddress());
}

public List<StagingPerfResult> run(int seconds) {
Map<String, StagingPerfResult> results = new HashMap<String, StagingPerfResult>();
long startTime = System.currentTimeMillis();
long lastCallbackTime = startTime;
long totalSeconds = 0;
log.info("Starting staging test, duration of {} seconds", seconds);

try {
for (long batchId = 1; batchId < seconds * 500; batchId++) {
Batch batch = new Batch(BatchType.EXTRACT, 0, "default", BinaryEncoding.HEX, "master", "1", true);
batch.setBatchId(1);
testBatch(batch, results);

if (Thread.interrupted()) {
log.warn("Test ending because thread interrupted");
break;
}
long time = System.currentTimeMillis();
totalSeconds = ((time - startTime) / 1000);
if (totalSeconds >= seconds) {
break;
}
if (time - lastCallbackTime > 1000) {
List<StagingPerfResult> resultsAsList = getResultsAsList(results);
logResults(totalSeconds, resultsAsList);
listener.update(getResultsAsList(results), (totalSeconds / (float) seconds));
lastCallbackTime = time;
}
}
} catch (Exception e) {
log.error("Failed to run test", e);
}

List<StagingPerfResult> resultsAsList = getResultsAsList(results);
logResults(totalSeconds, resultsAsList);
return resultsAsList;
}

protected void logResults(long totalSeconds, List<StagingPerfResult> resultsAsList) {
log.info("Running for {} seconds", totalSeconds);
for (StagingPerfResult result : resultsAsList) {
log.info(result.toString());
}
}

protected void testBatch(Batch batch, Map<String, StagingPerfResult> results) {
long ts = System.currentTimeMillis();
StagingFileLock lock = stagingMgr.acquireFileLock(serverInfo, STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
if (lock.isAcquired()) {
increment(results, STAT_LOCK_ACQUIRE, System.currentTimeMillis() - ts);
lock.releaseLock();
} else {
throw new RuntimeException("Failed to create lock file");
}

ts = System.currentTimeMillis();
IStagedResource resource = stagingMgr.create(STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
increment(results, STAT_BATCH_CREATE, System.currentTimeMillis() - ts);

ts = System.currentTimeMillis();
try (BufferedWriter writer = resource.getWriter(0l)) {
for (int i = 0; i < 100; i++) {
writer.write(RandomStringUtils.random(1000));
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resource.close();
increment(results, STAT_BATCH_WRITE, System.currentTimeMillis() - ts);
}

ts = System.currentTimeMillis();
resource.setState(State.DONE);
increment(results, STAT_BATCH_RENAME, System.currentTimeMillis() - ts);
} else {
throw new RuntimeException("Failed to create staging file");
}

ts = System.currentTimeMillis();
resource = stagingMgr.find(STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
increment(results, STAT_BATCH_FIND, System.currentTimeMillis() - ts);

ts = System.currentTimeMillis();
try (BufferedReader reader = resource.getReader()) {
while (reader.readLine() != null) {
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resource.close();
increment(results, STAT_BATCH_READ, System.currentTimeMillis() - ts);
}
resource.delete();
} else {
throw new RuntimeException("Failed to find staging file");
}
}

protected void increment(Map<String, StagingPerfResult> results, String statName, long millis) {
StagingPerfResult result = results.get(statName);
if (result == null) {
result = new StagingPerfResult(statName);
results.put(statName, result);
}
result.incrementCount(1);
result.incrementMillis(millis);
}

public static List<StagingPerfResult> getEmptyResults() {
List<StagingPerfResult> list = new ArrayList<StagingPerfResult>();
list.add(new StagingPerfResult(STAT_LOCK_ACQUIRE));
list.add(new StagingPerfResult(STAT_BATCH_CREATE));
list.add(new StagingPerfResult(STAT_BATCH_WRITE));
list.add(new StagingPerfResult(STAT_BATCH_RENAME));
list.add(new StagingPerfResult(STAT_BATCH_FIND));
list.add(new StagingPerfResult(STAT_BATCH_READ));
return list;
}

protected List<StagingPerfResult> getResultsAsList(Map<String, StagingPerfResult> results) {
List<StagingPerfResult> list = new ArrayList<StagingPerfResult>();
updateRating(STAT_LOCK_ACQUIRE, results, list, 50, 8000);
updateRating(STAT_BATCH_CREATE, results, list, 150, 25000);
updateRating(STAT_BATCH_WRITE, results, list, 10, 400);
updateRating(STAT_BATCH_RENAME, results, list, 150, 25000);
updateRating(STAT_BATCH_FIND, results, list, 300, 45000);
updateRating(STAT_BATCH_READ, results, list, 20, 800);
return list;
}

protected void updateRating(String statName, Map<String, StagingPerfResult> results, List<StagingPerfResult> list,
long lowCount, long highCount) {
StagingPerfResult result = results.get(statName);
if (result != null) {
long opSec = result.getOperationsPerSecond();
if (opSec <= lowCount) {
result.setRating(1.0f);
} else if (opSec >= highCount) {
result.setRating(9.9f);
} else {
float rating = 9.9f * ((opSec - lowCount) / ((float) (highCount - lowCount)));
result.setRating(rating);
}
list.add(result);
}
}

}
@@ -0,0 +1,29 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.stage;

import java.util.List;

public interface StagingPerfListener {

public void update(List<StagingPerfResult> results, float percentComplete);

}
@@ -0,0 +1,121 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.stage;

public class StagingPerfResult {

private String name;

private long count;

private long millis;

private float rating;

public StagingPerfResult(String name, long count, long millis, float rating) {
this.name = name;
this.count = count;
this.millis = millis;
this.rating = rating;
}

public StagingPerfResult(String name) {
this.name = name;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StagingPerfResult other = (StagingPerfResult) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}

@Override
public String toString() {
return getClass().getSimpleName() + " { name=" + name + ", count=" + count + ", millis=" + millis + ", ops=" + getOperationsPerSecond() + " }";
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public long getCount() {
return count;
}

public void incrementCount(long inc) {
count += inc;
}

public void setCount(long count) {
this.count = count;
}

public long getMillis() {
return millis;
}

public void incrementMillis(long inc) {
millis += inc;
}

public void setMillis(long millis) {
this.millis = millis;
}

public long getOperationsPerSecond() {
if (millis > 0) {
return (long) (count / (millis / 1000f));
}
return count;
}

public float getRating() {
return rating;
}

public void setRating(float rating) {
this.rating = rating;
}

}

0 comments on commit d40f2e8

Please sign in to comment.