Skip to content

Commit

Permalink
SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments
Browse files Browse the repository at this point in the history
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>,Xinyu Liu <xiliu@linkedin.com>,Navina Ramesh <navina@apache.org>

Closes #74 from jmakes/samza-1096
  • Loading branch information
Jacob Maes committed Mar 6, 2017
1 parent d104013 commit e6c1eed
Show file tree
Hide file tree
Showing 15 changed files with 810 additions and 113 deletions.
225 changes: 168 additions & 57 deletions docs/learn/documentation/versioned/jobs/configuration-table.html

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions docs/learn/documentation/versioned/jobs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ to log4j.xml and define the system name by specifying the config:
task.log4j.system="<system-name>"
{% endhighlight %}

The default stream name for logger is generated using the following convention, though you can override it using the `StreamName` property in the log4j.xml as shown above.
```java
"__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
```

Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:

{% highlight jproperties %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.samza.system;

import java.lang.reflect.Constructor;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.ConfigException;
import org.apache.samza.operators.StreamGraphBuilder;
Expand All @@ -26,6 +27,9 @@

/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface ExecutionEnvironment {
Expand All @@ -46,13 +50,17 @@ static ExecutionEnvironment getLocalEnvironment(Config config) {
/**
* Static method to load the non-standalone environment.
*
* Requires the implementation class to define a constructor with a single {@link Config} as the argument.
*
* @param config configuration passed in to initialize the Samza processes
* @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
*/
static ExecutionEnvironment fromConfig(Config config) {
try {
if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
return (ExecutionEnvironment) constructor.newInstance(config);
}
} catch (Exception e) {
throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
Expand All @@ -70,4 +78,24 @@ static ExecutionEnvironment fromConfig(Config config) {
*/
void run(StreamGraphBuilder graphBuilder, Config config);

/**
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
*
* The stream configurations are read from the following properties in the config:
* {@code streams.{$streamId}.*}
* <br>
* All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
* properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
*
* <ul>
* <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
* the stream will be associated with the System defined in {@code job.default.system}</li>
* <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
* If this property isn't defined the physical.name will be set to the streamId</li>
* </ul>
*
* @param streamId The logical identifier for the stream in Samza.
* @return The {@link StreamSpec} instance.
*/
StreamSpec streamFromConfig(String streamId);
}
15 changes: 8 additions & 7 deletions samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,8 @@ public StreamSpec(String id, String physicalName, String systemName, Map<String,
* @param config A map of properties for the stream. These may be System-specfic.
*/
public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) {
if (id == null) {
throw new NullPointerException("Parameter 'id' must not be null");
}

if (systemName == null) {
throw new NullPointerException("Parameter 'systemName' must not be null");
}
validateLogicalIdentifier("id", id);
validateLogicalIdentifier("systemName", systemName);

if (partitionCount < 1) {
throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0");
Expand Down Expand Up @@ -200,4 +195,10 @@ public String get(String propertyName) {
public String getOrDefault(String propertyName, String defaultValue) {
return config.getOrDefault(propertyName, defaultValue);
}

private void validateLogicalIdentifier(String identifierName, String identifierValue) {
if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system;

import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.StreamConfig;


public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {

private final Config config;

public AbstractExecutionEnvironment(Config config) {
if (config == null) {
throw new NullPointerException("Parameter 'config' cannot be null.");
}

this.config = config;
}

@Override
public StreamSpec streamFromConfig(String streamId) {
StreamConfig streamConfig = new StreamConfig(config);
String physicalName = streamConfig.getPhysicalName(streamId, streamId);

return streamFromConfig(streamId, physicalName);
}

/**
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
*
* The stream configurations are read from the following properties in the config:
* {@code streams.{$streamId}.*}
* <br>
* All properties matching this pattern are assumed to be system-specific with one exception. The following
* property is a Samza property which is used to bind the stream to a system.
*
* <ul>
* <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
* the stream will be associated with the System defined in {@code job.default.system}</li>
* </ul>
*
* @param streamId The logical identifier for the stream in Samza.
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
* @return The {@link StreamSpec} instance.
*/
/*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
StreamConfig streamConfig = new StreamConfig(config);
String system = streamConfig.getSystem(streamId);

return streamFromConfig(streamId, physicalName, system);
}

/**
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
*
* The stream configurations are read from the following properties in the config:
* {@code streams.{$streamId}.*}
*
* @param streamId The logical identifier for the stream in Samza.
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
* @param system The name of the System on which this stream will be used.
* @return The {@link StreamSpec} instance.
*/
/*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
StreamConfig streamConfig = new StreamConfig(config);
Map<String, String> properties = streamConfig.getStreamProperties(streamId);

return new StreamSpec(streamId, physicalName, system, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
/**
* This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
*/
public class RemoteExecutionEnvironment implements ExecutionEnvironment {
public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {

public RemoteExecutionEnvironment(Config config) {
super(config);
}

@Override public void run(StreamGraphBuilder app, Config config) {
// TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
/**
* This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
*/
public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {

public StandaloneExecutionEnvironment(Config config) {
super(config);
}

// TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
StreamGraph createGraph(StreamGraphBuilder app, Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object JobConfig {
val SAMZA_FWK_PATH = "samza.fwk.path"
val SAMZA_FWK_VERSION = "samza.fwk.version"
val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
val JOB_DEFAULT_SYSTEM = "job.default.system"
val JOB_CONTAINER_COUNT = "job.container.count"
val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
Expand Down Expand Up @@ -104,6 +105,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))

def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)

def getContainerCount = {
getOption(JobConfig.JOB_CONTAINER_COUNT) match {
case Some(count) => count.toInt
Expand Down
Loading

0 comments on commit e6c1eed

Please sign in to comment.