Skip to content
Permalink
Browse files
[FLINK-25935] Add LocalEnvironmentEntrypoint
The LocalEnvironmentEntrypoint runs a Stateful Functions applications within a
single process. It uses Flink's LocalStreamExecutionEnvironment for this. The
Entrypoint can be configured via

* --module <PATH_TO_MODULE>
* --set <CONFIG_OPTION>=<CONFIG_VALUE>
  • Loading branch information
tillrohrmann committed Feb 7, 2022
1 parent 8f0149e commit e956104c2e097891e03f62092844c687d984ebe0
Showing 7 changed files with 429 additions and 0 deletions.
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build the functions code ...
FROM maven:3.6.3-jdk-11 AS builder
COPY src /usr/src/app/src
COPY pom.xml /usr/src/app
RUN mvn -f /usr/src/app/pom.xml clean package

# ... and run the web server!
FROM openjdk:11
WORKDIR /
COPY --from=builder /usr/src/app/target/statefun-playground-entrypoint*.jar statefun-playground-entrypoint.jar
ENTRYPOINT ["java", "-jar", "statefun-playground-entrypoint.jar"]
@@ -0,0 +1,12 @@
# Stateful Functions Playground Entrypoint

A simple Stateful Functions entrypoint that runs Stateful Functions within a single process.

## Configuring a module.yaml

Per default the `LocalEnvironmentEntrypoint` expects a `module.yaml` to be on the classpath.
Alternatively, one can provide a different location via `--module file://<PATH>`.

## Configuring the Flink Runtime

One can configure the underlying Flink runtime via `--set <CONFIG_OPTION>=<CONFIG_VALUE>`.
@@ -0,0 +1,145 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.flink</groupId>
<artifactId>statefun-playground-entrypoint</artifactId>
<version>3.2.0</version>
<packaging>jar</packaging>

<properties>
<statefun.version>3.2.0</statefun.version>
<flink.version>1.14.3</flink.version>
<slf4j.version>1.7.35</slf4j.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<!-- StateFun Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-core</artifactId>
<version>${statefun.version}</version>
</dependency>
<!-- StateFun Distribution -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-io-bundle</artifactId>
<version>${statefun.version}</version>
</dependency>

<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Build a fat executable jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.statefun.playground.internal.entrypoint.LocalEnvironmentEntrypoint</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<!-- Java code style -->
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>1.20.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
<removeUnusedImports/>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.playground.internal.entrypoint;

import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
import org.apache.flink.statefun.flink.core.spi.Modules;

/**
* This class duplicates StatefulFunctionsUniverses.ClassPathUniverseProvider because it is not
* public. Needs a new Statefun release to change the visibility. @Todo Update Statefun to expose
* this functionality
*/
final class ClassPathUniverseProvider implements StatefulFunctionsUniverseProvider {

private static final long serialVersionUID = 1;

@Override
public StatefulFunctionsUniverse get(
ClassLoader classLoader, StatefulFunctionsConfig configuration) {
Modules modules = Modules.loadFromClassPath(configuration);
return modules.createStatefulFunctionsUniverse();
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.playground.internal.entrypoint;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.util.StringUtils;

public final class ConfigurationValidator {

private ConfigurationValidator() {}

static void validate(Configuration configuration) {
validateCustomPayloadSerializerClassName(configuration);
validateNoHeapBackedTimers(configuration);
validateUnalignedCheckpointsDisabled(configuration);
}

private static void validateCustomPayloadSerializerClassName(Configuration configuration) {

MessageFactoryType factoryType =
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
String customPayloadSerializerClassName =
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);

if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
if (StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
throw new StatefulFunctionsInvalidConfigException(
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS,
"custom payload serializer class must be supplied with WITH_CUSTOM_PAYLOADS serializer");
}
} else {
if (customPayloadSerializerClassName != null) {
throw new StatefulFunctionsInvalidConfigException(
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS,
"custom payload serializer class may only be supplied with WITH_CUSTOM_PAYLOADS serializer");
}
}
}

private static final ConfigOption<String> TIMER_SERVICE_FACTORY =
ConfigOptions.key("state.backend.rocksdb.timer-service.factory")
.stringType()
.defaultValue("rocksdb");

private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS =
ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false);

private static void validateNoHeapBackedTimers(Configuration configuration) {
final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY);
if (!timerFactory.equalsIgnoreCase("rocksdb")) {
throw new StatefulFunctionsInvalidConfigException(
TIMER_SERVICE_FACTORY,
"StateFun only supports non-heap timers with a rocksdb state backend.");
}
}

private static void validateUnalignedCheckpointsDisabled(Configuration configuration) {
final boolean unalignedCheckpoints = configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS);
if (unalignedCheckpoints) {
throw new StatefulFunctionsInvalidConfigException(
ENABLE_UNALIGNED_CHECKPOINTS,
"StateFun currently does not support unaligned checkpointing.");
}
}
}

0 comments on commit e956104

Please sign in to comment.