Skip to content

Commit

Permalink
#1 Initial SDK implementation (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 9, 2023
1 parent 5c7f283 commit b0e8aee
Show file tree
Hide file tree
Showing 41 changed files with 1,527 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ bin

# VSCode
.vscode

# Apache Maven
dependency-reduced-pom.xml
target

# Misc.
.DS_Store
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Decodable Java SDK

This repository contains a software development kit (SDK) for implementing Apache Flink jobs and running them on Decodable.
It integrates Flink with the Decodable environment, for instance providing easy access to Decodable streams.

## Structure

* _sdk_: The Decodable SDK
* _examples_: Examples for using the SDK

## Installation

tbd.

## Usage

tbd.

## Building the SDK

Gradle is used for building the SDK.

Run the following to produce the SDK binary:

```bash
./gradlew build
```

Run the following to install the SDK JAR into your local Maven repository:

```bash
./gradlew publishToMavenLocal
```

## License

This code base is available under the Apache License, version 2.
161 changes: 161 additions & 0 deletions examples/apache-maven/custom-pipelines-hello-world/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-License-Identifier: Apache-2.0
Copyright The original authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>co.decodable.examples</groupId>
<artifactId>custom-pipelines-hello-world</artifactId>
<version>0.1</version>
<packaging>jar</packaging>

<name>Decodable SDK Example</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.4</flink.version>
<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
</properties>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>co.decodable</groupId>
<artifactId>decodable-sdk-java</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>co.decodable.examples.cpdemo.DataStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package co.decodable.examples.cpdemo;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import co.decodable.sdk.DecodableStreamSink;
import co.decodable.sdk.DecodableStreamSource;

public class DataStreamJob {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DecodableStreamSource<String> source =
DecodableStreamSource.builder()
.withStreamName("purchase-orders")
.build();

DecodableStreamSink<String> sink =
DecodableStreamSink.builder()
.withStreamName("purchase-orders-processed")
.build();

DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new NameConverter());

stream.sinkTo(sink);

env.execute("Purchase Order Processor");
}

public static class NameConverter extends RichMapFunction<String, String> {

private static final long serialVersionUID = 1L;

private transient ObjectMapper mapper;
@Override
public String map(String value) throws Exception {
ObjectNode purchaseOrder = (ObjectNode) mapper.readTree(value);
purchaseOrder.put("customer_name", purchaseOrder.get("customer_name").asText().toUpperCase());
return mapper.writeValueAsString(purchaseOrder);
}

@Override
public void open(Configuration parameters) throws Exception {
mapper = new ObjectMapper();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
################################################################################
# Copyright The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
21 changes: 21 additions & 0 deletions build.gradle → sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ plugins {
id 'java-library'
id 'eclipse'
id 'com.diffplug.spotless' version '6.8.0'
id 'maven-publish'
}

group = 'co.decodable'
version = '1.0.0-SNAPSHOT'

repositories {
Expand Down Expand Up @@ -51,3 +53,22 @@ spotless {
*/'''
}
}

publishing {
publications {
maven(MavenPublication) {
from components.java

pom.withXml {
asNode().dependencies.'*'.each() {
if (it.artifactId.text() == 'flink-core' || it.artifactId.text() == 'flink-streaming-java') {
it.scope*.value = 'provided'
}
else if (it.artifactId.text() == 'flink-connector-kafka') {
it.scope*.value = 'compile'
}
}
}
}
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
11 changes: 11 additions & 0 deletions sdk/src/main/java/co/decodable/sdk/DecodableCommittable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright the original authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk;

import co.decodable.sdk.util.Incubating;

@Incubating
public interface DecodableCommittable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright the original authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk;

import co.decodable.sdk.util.Incubating;

@Incubating
public interface DecodableSourceEnumeratorState {}
12 changes: 12 additions & 0 deletions sdk/src/main/java/co/decodable/sdk/DecodableSourceSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright the original authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.sdk;

import co.decodable.sdk.util.Incubating;
import org.apache.flink.api.connector.source.SourceSplit;

@Incubating
public interface DecodableSourceSplit extends SourceSplit {}
Loading

0 comments on commit b0e8aee

Please sign in to comment.