Skip to content
Permalink
Browse files
[FLINK-18194][walkthroughs] Add table api walkthrough skeleton code
This closes #13
  • Loading branch information
sjwiesman committed Jun 23, 2020
1 parent bc804b9 commit 19e83056dbfd68899362fc37250de810cf190cc1
Showing 14 changed files with 767 additions and 29 deletions.
@@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A basic data generator for continuously writing data into a Kafka topic. */
public class DataGenerator {

private static final Logger LOG = LoggerFactory.getLogger(DataGenerator.class);
@@ -18,20 +18,18 @@

package org.apache.flink.playground.datagen;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.flink.playground.datagen.model.Transaction;
import org.apache.flink.playground.datagen.model.TransactionSerializer;
import org.apache.flink.playground.datagen.model.TransactionSupplier;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/** Generates CSV transaction records at a rate */
public class Producer implements Runnable, AutoCloseable {

private static final DateTimeFormatter formatter =
@@ -51,36 +49,20 @@ public Producer(String brokers, String topic) {

@Override
public void run() {
KafkaProducer<Long, String> producer = new KafkaProducer<>(getProperties());
KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());

Throttler throttler = new Throttler(100);

Random generator = new Random();

Iterator<Long> accounts =
Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
.flatMap(UnaryOperator.identity())
.iterator();

Iterator<LocalDateTime> timestamps =
Stream.iterate(
LocalDateTime.of(2000, 1, 1, 1, 0),
time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
.iterator();
TransactionSupplier transactions = new TransactionSupplier();

while (isRunning) {

Long account = accounts.next();
LocalDateTime timestamp = timestamps.next();
long millis = timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
Transaction transaction = transactions.get();

String transaction =
String.format(
"%s, %s, %s",
account.toString(), generator.nextInt(1000), timestamp.format(formatter));
long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();

ProducerRecord<Long, String> record =
new ProducerRecord<>(topic, null, millis, account, transaction);
ProducerRecord<Long, Transaction> record =
new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
producer.send(record);

try {
@@ -104,7 +86,7 @@ private Properties getProperties() {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionSerializer.class);

return props;
}
@@ -18,6 +18,7 @@

package org.apache.flink.playground.datagen;

/** A data throttler that controls the rate at which data is written out to Kafka. */
final class Throttler {

private final long throttleBatchSize;
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.playground.datagen.model;

import java.time.LocalDateTime;

/** A simple financial transaction. */
public class Transaction {
public long accountId;

public int amount;

public LocalDateTime timestamp;
}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.playground.datagen.model;

import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;

/** Serializes a {@link Transaction} into a CSV record. */
public class TransactionSerializer implements Serializer<Transaction> {

private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");

@Override
public void configure(Map<String, ?> map, boolean b) {}

@Override
public byte[] serialize(String s, Transaction transaction) {
String csv =
String.format(
"%s, %s, %s",
transaction.accountId, transaction.amount, transaction.timestamp.format(formatter));

return csv.getBytes();
}

@Override
public void close() {}
}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.playground.datagen.model;

import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Random;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

/** A supplier that generates an arbitrary transaction. */
public class TransactionSupplier implements Supplier<Transaction> {

private final Random generator = new Random();

private final Iterator<Long> accounts =
Stream.generate(() -> Stream.of(1L, 2L, 3L, 4L, 5L))
.flatMap(UnaryOperator.identity())
.iterator();

private final Iterator<LocalDateTime> timestamps =
Stream.iterate(
LocalDateTime.of(2000, 1, 1, 1, 0),
time -> time.plusMinutes(5).plusSeconds(generator.nextInt(58) + 1))
.iterator();

@Override
public Transaction get() {
Transaction transaction = new Transaction();
transaction.accountId = accounts.next();
transaction.amount = generator.nextInt(1000);
transaction.timestamp = timestamps.next();

return transaction;
}
}
@@ -0,0 +1,45 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

FROM maven:3.6-jdk-8-slim AS builder

COPY ./pom.xml /opt/pom.xml
COPY ./src /opt/src
RUN cd /opt; mvn clean install -Dmaven.test.skip

FROM flink:1.11-SNAPSHOT-scala_2.11

# Download connector libraries for snapshot version
RUN wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/1.11-SNAPSHOT/flink-sql-connector-kafka_2.11-1.11-20200610.034108-152.jar; \
wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-connector-jdbc_2.11/1.11-SNAPSHOT/flink-connector-jdbc_2.11-1.11-20200610.033814-8.jar; \
wget -P /opt/flink/lib/ https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-csv/1.11-SNAPSHOT/flink-csv-1.11-20200610.033438-153.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;


# Download connector libraries
#RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/${FLINK_VERSION}/flink-sql-connector-kafka_2.11-${FLINK_VERSION}.jar; \
# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-jdbc_2.11/${FLINK_VERSION}/flink-jdbc_2.11-${FLINK_VERSION}.jar; \
# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
# wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar;

COPY --from=builder /opt/target/spend-report-*.jar /opt/flink/usrlib/spend-report.jar

RUN echo "execution.checkpointing.interval: 10s" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.object-reuse: true" >> /opt/flink/conf/flink-conf.yaml; \
echo "pipeline.time-characteristic: EventTime" >> /opt/flink/conf/flink-conf.yaml; \
echo "taskmanager.memory.jvm-metaspace.size: 256m" >> /opt/flink/conf/flink-conf.yaml;
@@ -0,0 +1,91 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

version: '2.1'
services:
jobmanager:
image: apache/flink-table-walkthrough:1-FLINK-1.11-scala_2.11
build: .
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8082:8081"
command: standalone-job
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
depends_on:
- kafka
- mysql
taskmanager:
image: apache/flink-playground-walkthrough:1-FLINK-1.11-scala_2.11
build: .
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_ADVERTISED_PORT: "9092"
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "kafka:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
data-generator:
image: apache/data-generator:1
build: ../docker/data-generator
depends_on:
- kafka
mysql:
image: mysql:8.0.19
command: --default-authentication-plugin=mysql_native_password --secure_file_priv=/data
environment:
MYSQL_USER: "sql-demo"
MYSQL_PASSWORD: "demo-sql"
MYSQL_DATABASE: "sql-demo"
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
volumes:
- ../docker/mysql-spend-report-init:/docker-entrypoint-initdb.d
- ./data:/data
grafana:
image: grafana/grafana
ports:
- "3000:3000"
depends_on:
- mysql
volumes:
- ../docker/grafana-spend-report-init/provisioning/:/etc/grafana/provisioning/
- ../docker/grafana-spend-report-init/dashboard.json:/etc/grafana/dashboard.json
- ../docker/grafana-spend-report-init/grafana.ini:/etc/grafana/grafana.ini

0 comments on commit 19e8305

Please sign in to comment.