Skip to content

Commit

Permalink
provide Hadoop counters exposing the I/O stats
Browse files Browse the repository at this point in the history
gather stats for all I/O operations within es-hadoop
the stats are not coupled to Hadoop and can be exposed through other means
fixes #141
  • Loading branch information
costin committed Apr 8, 2014
1 parent dcf11e7 commit a889751
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 26 deletions.
39 changes: 39 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/Counters.java
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.mr;

/**
* Enum used for representing the stats inside Hadoop.
*/
public enum Counters {

BYTES_WRITTEN,
DOCS_WRITTEN,
BULK_WRITES,

DOCS_RETRIED,
BYTES_RETRIED,
BULK_RETRIES,

BYTES_READ,
DOCS_READ,

NODE_RETRIES,
NET_RETRIES;
}
13 changes: 13 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/Counters.properties
@@ -0,0 +1,13 @@
CounterGroupName=Elasticsearch Hadoop Counters
BYTES_READ.name=Bytes Read
BYTES_WRITTEN.name=Bytes Written

BULK_RETRIES.name=Bulk Retries
BULK_WRITES.name=Bulk Writes

DOCS_READ.name=Documents Read
DOCS_RETRIED.name=Documents Retried
DOCS_WRITTEN.name=Documents Written

NODE_RETRIES.name=Node Retries
NET_RETRIES.name=Network Retries
35 changes: 25 additions & 10 deletions src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.rest.dto.mapping.Field;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.builder.ValueReader;
import org.elasticsearch.hadoop.util.IOUtils;
Expand Down Expand Up @@ -148,6 +149,7 @@ protected static abstract class ShardRecordReader<K,V> extends RecordReader<K, V
private long size = 0;

private HeartBeat beat;
private Progressable progressable;

// default constructor used by the NEW api
public ShardRecordReader() {
Expand All @@ -164,6 +166,7 @@ public ShardRecordReader(org.apache.hadoop.mapred.InputSplit split, Configuratio
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
context.setStatus(split.toString());
init((ShardInputSplit) split, context.getConfiguration(), context);

}

void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable) {
Expand Down Expand Up @@ -203,6 +206,8 @@ void init(ShardInputSplit esSplit, Configuration cfg, Progressable progressable)

queryBuilder.fields(settings.getScrollFields());

this.progressable = progressable;

if (log.isDebugEnabled()) {
log.debug(String.format("Initializing RecordReader for [%s]", esSplit));
}
Expand Down Expand Up @@ -239,19 +244,29 @@ public float getProgress() {

@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug(String.format("Closing RecordReader for [%s]", esSplit));
}
try {
if (log.isDebugEnabled()) {
log.debug(String.format("Closing RecordReader for [%s]", esSplit));
}

if (beat != null) {
beat.stop();
}
if (beat != null) {
beat.stop();
}

if (result != null) {
result.close();
}

if (result != null) {
result.close();
result = null;
client.close();

} finally {
Stats stats = new Stats();
stats.aggregate(client.stats());
if (result != null) {
stats.aggregate(result.stats());
}
ReportingUtils.report(progressable, stats);
}
client.close();
}

@Override
Expand Down
Expand Up @@ -221,11 +221,15 @@ private int detectCurrentInstance(Configuration conf) {

@Override
public void close(TaskAttemptContext context) throws IOException {
close((Reporter) null);
doClose(context);
}

@Override
public void close(Reporter reporter) throws IOException {
doClose(reporter);
}

private void doClose(Progressable progressable) throws IOException {
if (log.isTraceEnabled()) {
log.trace(String.format("Closing RecordWriter [%s][%s]", uri, resource));
}
Expand All @@ -237,6 +241,9 @@ public void close(Reporter reporter) throws IOException {
if (client != null) {
client.close();
}

ReportingUtils.report(progressable, client.stats());

initialized = false;
}
}
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/mr/ReportingUtils.java
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.mr;

import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.rest.stats.Stats;

class ReportingUtils {

// handles Hadoop 'old' and 'new' API reporting classes, namely {@link Reporter} and {@link TaskInputOutputContext}
@SuppressWarnings({ "rawtypes", "unchecked" })
static void report(Progressable progressable, Stats stats) {
if (progressable == null) {
return;
}

if (progressable instanceof Reporter) {
Reporter reporter = (Reporter) progressable;

reporter.getCounter(Counters.BYTES_WRITTEN).increment(stats.bytesWritten);
reporter.getCounter(Counters.DOCS_WRITTEN).increment(stats.docsWritten);
reporter.getCounter(Counters.BULK_WRITES).increment(stats.bulkWrites);
reporter.getCounter(Counters.DOCS_RETRIED).increment(stats.docsRetried);
reporter.getCounter(Counters.BULK_RETRIES).increment(stats.bulkRetries);
reporter.getCounter(Counters.DOCS_READ).increment(stats.docsRead);
reporter.getCounter(Counters.NODE_RETRIES).increment(stats.nodeRetries);
reporter.getCounter(Counters.NET_RETRIES).increment(stats.netRetries);
}

if (progressable instanceof TaskInputOutputContext) {
TaskInputOutputContext tioc = (TaskInputOutputContext) progressable;

tioc.getCounter(Counters.BYTES_WRITTEN).increment(stats.bytesWritten);
tioc.getCounter(Counters.DOCS_WRITTEN).increment(stats.docsWritten);
tioc.getCounter(Counters.BULK_WRITES).increment(stats.bulkWrites);
tioc.getCounter(Counters.DOCS_RETRIED).increment(stats.docsRetried);
tioc.getCounter(Counters.BULK_RETRIES).increment(stats.bulkRetries);
tioc.getCounter(Counters.DOCS_READ).increment(stats.docsRead);
tioc.getCounter(Counters.NODE_RETRIES).increment(stats.nodeRetries);
tioc.getCounter(Counters.NET_RETRIES).increment(stats.netRetries);
}
}
}
Expand Up @@ -76,4 +76,8 @@ public void reset() throws IOException {
public boolean markSupported() {
return delegate.markSupported();
}

public boolean isNull() {
return delegate == null;
}
}
32 changes: 30 additions & 2 deletions src/main/java/org/elasticsearch/hadoop/rest/NetworkClient.java
Expand Up @@ -25,9 +25,13 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ByteSequence;

public class NetworkClient {

public class NetworkClient implements StatsAware {

private static Log log = LogFactory.getLog(NetworkClient.class);

Expand All @@ -38,6 +42,8 @@ public class NetworkClient {
private String currentUri;
private int nextClient = 0;

private final Stats stats = new Stats();

public NetworkClient(Settings settings, List<String> hostURIs) {
this.settings = settings.copy();
this.nodes = hostURIs;
Expand All @@ -52,6 +58,10 @@ private boolean selectNextNode() {
return false;
}

if (currentTransport != null) {
stats.nodeRetries++;
}

currentUri = nodes.get(nextClient++);
close();

Expand All @@ -72,6 +82,15 @@ public Response execute(Request request) throws IOException {
newNode = false;
try {
response = currentTransport.execute(routedRequest);
ByteSequence body = routedRequest.body();
if (body != null) {
stats.bytesWritten += body.length();
}

if (response.hasBody()) {
stats.bytesRead += response.body().available();
}

} catch (Exception ex) {
if (log.isTraceEnabled()) {
log.trace(String.format("Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentUri, request.path()), ex);
Expand All @@ -90,9 +109,18 @@ public Response execute(Request request) throws IOException {
return response;
}

public void close() {
void close() {
if (currentTransport != null) {
if (currentTransport instanceof StatsAware) {
stats.aggregate(((StatsAware) currentTransport).stats());
}

currentTransport.close();
}
}

@Override
public Stats stats() {
return stats;
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/rest/Response.java
Expand Up @@ -43,4 +43,6 @@ public interface Response {
boolean hasSucceeded();

boolean hasFailed();

boolean hasBody();
}
26 changes: 25 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Request.Method;
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.serialization.ParsingUtils;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonParser;
import org.elasticsearch.hadoop.util.ByteSequence;
Expand All @@ -49,14 +51,16 @@

import static org.elasticsearch.hadoop.rest.Request.Method.*;

public class RestClient implements Closeable {
public class RestClient implements Closeable, StatsAware {

private NetworkClient network;
private ObjectMapper mapper = new ObjectMapper();
private TimeValue scrollKeepAlive;
private boolean indexReadMissingAsEmpty;
private final HttpRetryPolicy retryPolicy;

private final Stats stats = new Stats();

public enum HEALTH {
RED, YELLOW, GREEN
}
Expand Down Expand Up @@ -115,8 +119,23 @@ public void bulk(Resource resource, TrackingBytesArray data) throws IOException
Retry retry = retryPolicy.init();
int httpStatus = 0;

boolean isRetry = false;

do {
Response response = execute(PUT, resource.bulk(), data);

stats.bulkWrites++;
stats.docsWritten += data.entries();
// bytes will be counted by the transport layer

if (isRetry) {
stats.docsRetried += data.entries();
stats.bytesRetried += data.length();
stats.bulkRetries++;
}

isRetry = true;

httpStatus = (retryFailedEntries(response.body(), data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK);
} while (data.length() > 0 && retry.retry(httpStatus));
}
Expand Down Expand Up @@ -285,4 +304,9 @@ public boolean health(String index, HEALTH health, TimeValue timeout) throws IOE

return (Boolean.TRUE.equals(get(sb.toString(), "timed_out")));
}

@Override
public Stats stats() {
return stats.aggregate(network.stats());
}
}

0 comments on commit a889751

Please sign in to comment.