Skip to content

Commit

Permalink
Moved Stats to java instance since it now only has visibility in java…
Browse files Browse the repository at this point in the history
… world only (#144)
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 937b57d commit 7af9d02
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 44 deletions.
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.stats.FunctionStats;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

/**
Expand Down Expand Up @@ -84,19 +83,13 @@ public void stop() {

@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
FunctionStats stats = javaInstanceRunnable.getStats();
FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
if (javaInstanceRunnable.getFailureException() != null) {
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException(javaInstanceRunnable.getFailureException().getMessage());
} else {
functionStatusBuilder.setRunning(true);
}
functionStatusBuilder.setNumProcessed(stats.getTotalProcessed());
functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalSuccessfullyProcessed());
functionStatusBuilder.setNumUserExceptions(stats.getTotalUserExceptions());
functionStatusBuilder.setNumSystemExceptions(stats.getTotalSystemExceptions());
functionStatusBuilder.setNumTimeouts(stats.getTotalTimeoutExceptions());
return CompletableFuture.completedFuture(functionStatusBuilder.build());
}

Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.stats;
package org.apache.pulsar.functions.runtime.instance;

import com.yahoo.sketches.quantiles.DoublesSketch;
import org.apache.pulsar.shade.io.netty.util.Timeout;
Expand All @@ -31,7 +31,7 @@
* Function stats.
*/
@Slf4j
public class FunctionStats implements AutoCloseable {
public class FunctionStats {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -158,7 +158,6 @@ private void cancelStatsTimeout() {
}
}

@Override
public void close() {
cancelStatsTimeout();
}
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.stats.FunctionStats;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

import java.util.HashMap;
Expand Down Expand Up @@ -205,17 +204,12 @@ public InstanceControlImpl(JavaInstanceRunnable javaInstanceRunnable) {

@Override
public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) {
FunctionStats stats = javaInstanceRunnable.getStats();
InstanceCommunication.FunctionStatus.Builder builder = javaInstanceRunnable.getFunctionStatus();
String failureException = javaInstanceRunnable.getFailureException() != null
? javaInstanceRunnable.getFailureException().getMessage() : "";
InstanceCommunication.FunctionStatus response = InstanceCommunication.FunctionStatus.newBuilder()
InstanceCommunication.FunctionStatus response = builder
.setRunning(true)
.setFailureException(failureException)
.setNumProcessed(stats.getTotalProcessed())
.setNumSuccessfullyProcessed(stats.getTotalSuccessfullyProcessed())
.setNumTimeouts(stats.getTotalTimeoutExceptions())
.setNumSystemExceptions(stats.getTotalSystemExceptions())
.setNumUserExceptions(stats.getTotalUserExceptions())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.pulsar.functions.runtime.container.InstanceConfig;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.stats.FunctionStats;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;

Expand Down Expand Up @@ -73,7 +72,6 @@ private class InputMessage {
}

// function stats
@Getter
private final FunctionStats stats;

public JavaInstanceRunnable(InstanceConfig instanceConfig,
Expand Down Expand Up @@ -315,6 +313,16 @@ public InstanceCommunication.MetricsData getAndResetMetrics() {
return bldr.build();
}

public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setNumProcessed(stats.getTotalProcessed());
functionStatusBuilder.setNumSuccessfullyProcessed(stats.getTotalSuccessfullyProcessed());
functionStatusBuilder.setNumUserExceptions(stats.getTotalUserExceptions());
functionStatusBuilder.setNumSystemExceptions(stats.getTotalSystemExceptions());
functionStatusBuilder.setNumTimeouts(stats.getTotalTimeoutExceptions());
return functionStatusBuilder;
}

private static void addSystemMetrics(String metricName, double value, InstanceCommunication.MetricsData.Builder bldr) {
InstanceCommunication.MetricsData.DataDigest digest =
InstanceCommunication.MetricsData.DataDigest.newBuilder()
Expand All @@ -332,5 +340,4 @@ private static SerDe initializeSerDe(String serdeClassName, ClassLoader clsLoade
clsLoader);
}
}

}

This file was deleted.

0 comments on commit 7af9d02

Please sign in to comment.