Skip to content

Commit

Permalink
Made Window Context up to date with Context (#2932)
Browse files Browse the repository at this point in the history
* Made Window Context up to date with Context

* Added state interfaces as well
  • Loading branch information
srkukarni committed Nov 6, 2018
1 parent be0fe35 commit da0cc00
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 21 deletions.
Expand Up @@ -20,9 +20,25 @@

import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public interface WindowContext {

/**
* The tenant this function belongs to
* @return the tenant this function belongs to
*/
String getTenant();

/**
* The namespace this function belongs to
* @return the namespace this function belongs to
*/
String getNamespace();

/**
* The name of the function that we are executing
* @return The Function name
Expand All @@ -42,44 +58,95 @@ public interface WindowContext {
*/
int getInstanceId();

/**
* Get the number of instances that invoke this function.
*
* @return the number of instances that invoke this function.
*/
int getNumInstances();

/**
* The version of the function that we are executing
* @return The version id
*/
String getFunctionVersion();

/**
* The memory limit that this function is limited to
* @return Memory limit in bytes
* Get a list of all input topics
* @return a list of all input topics
*/
long getMemoryLimit();
Collection<String> getInputTopics();

/**
* The time budget in ms that the function is limited to.
* @return Time budget in msecs.
* Get the output topic of the function
* @return output topic name
*/
long getTimeBudgetInMs();
String getOutputTopic();

/**
* The time in ms remaining for this function execution to complete before it
* will be flagged as an error
* @return Time remaining in ms.
* Get output schema builtin type or custom class name
* @return output schema builtin type or custom class name
*/
long getRemainingTimeInMs();
String getOutputSchemaType();

/**
* The logger object that can be used to log in a function
* @return the logger object
*/
Logger getLogger();

/**
* Increment the builtin distributed counter refered by key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);

/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Updare the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);

/**
* Get a map of all user-defined key/value configs for the function
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();

/**
* Get Any user defined key/value
* @param key The key
* @return The value specified by the user for that key. null if no such key
*/
String getUserConfigValue(String key);

/**
* Get any user-defined key/value or a default value if none is present
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);

/**
* Record a user defined metric
* @param metricName The name of the metric
Expand All @@ -89,10 +156,22 @@ public interface WindowContext {

/**
* Publish an object using serDe for serializing to the topic
*
* @param topicName
* The name of the topic for publishing
* @param object
* The object that needs to be published
* @param schemaOrSerdeClassName
* Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);

/**
* Publish an object to the topic using default schemas
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param serDeClassName The class name of the class that needs to be used to serialize the object before publishing
* @return
* @return A future that completes when the framework is done publishing the message
*/
CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName);
<O> CompletableFuture<Void> publish(String topicName, O object);
}
Expand Up @@ -21,6 +21,9 @@
import org.apache.pulsar.functions.api.Context;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class WindowContextImpl implements WindowContext {
Expand All @@ -31,6 +34,16 @@ public WindowContextImpl(Context context) {
this.context = context;
}

@Override
public String getTenant() {
return this.context.getTenant();
}

@Override
public String getNamespace() {
return this.context.getNamespace();
}

@Override
public String getFunctionName() {
return this.context.getFunctionName();
Expand All @@ -46,43 +59,83 @@ public int getInstanceId() {
return this.context.getInstanceId();
}

@Override
public int getNumInstances() {
return this.context.getNumInstances();
}

@Override
public String getFunctionVersion() {
return this.getFunctionVersion();
}

@Override
public long getMemoryLimit() {
return this.getMemoryLimit();
public Collection<String> getInputTopics() {
return this.context.getInputTopics();
}

@Override
public long getTimeBudgetInMs() {
return this.getTimeBudgetInMs();
public String getOutputTopic() {
return this.context.getOutputTopic();
}

@Override
public long getRemainingTimeInMs() {
return this.getRemainingTimeInMs();
public String getOutputSchemaType() {
return this.context.getOutputSchemaType();
}

@Override
public Logger getLogger() {
return this.getLogger();
}

@Override
public void incrCounter(String key, long amount) {
this.context.incrCounter(key, amount);
}

@Override
public long getCounter(String key) {
return this.context.getCounter(key);
}

@Override
public void putState(String key, ByteBuffer value) {
this.context.putState(key, value);
}

@Override
public ByteBuffer getState(String key) {
return this.context.getState(key);
}

@Override
public Map<String, Object> getUserConfigMap() {
return this.context.getUserConfigMap();
}

@Override
public String getUserConfigValue(String key) {
return this.getUserConfigValue(key);
}

@Override
public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
return this.context.getUserConfigValueOrDefault(key, defaultValue);
}

@Override
public void recordMetric(String metricName, double value) {
this.context.recordMetric(metricName, value);
}

@Override
public CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName) {
return this.context.publish(topicName, object, serDeClassName);
public <O> CompletableFuture<Void> publish(String topicName, O object) {
return this.context.publish(topicName, object);
}

@Override
public CompletableFuture<Void> publish(String topicName, Object object, String schemaOrSerdeClassName) {
return this.context.publish(topicName, object, schemaOrSerdeClassName);
}
}

0 comments on commit da0cc00

Please sign in to comment.