Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink-demo #48

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions code/Flink/flink-basis-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>

<!-- Add connector dependencies here. They must be in the default scope (compile). -->

<!-- Example:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.heibaiying.connectors;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName KafkaTest.java
* @Description TODO
* @createTime 2021年05月03日 22:37:00
*/
public class KafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka:9092");
// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new
SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.heibaiying.datasource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName CustomDataSource.java
* @Description TODO
* @createTime 2021年05月03日 22:23:00
*/
public class CustomDataSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<Long>() {

private long count = 0L;
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> ctx) throws Exception {
if (isRunning && count < 1000) {
//将输入发送出去
ctx.collect(count);
count++;
}
}

@Override
public void cancel() {
isRunning = false;
}
}).print();
env.execute();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.heibaiying.datasource;

import java.io.Serializable;
import java.util.Iterator;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName CustomIterator.java
* @Description TODO
* @createTime 2021年05月03日 22:16:00
*/
public class CustomIterator implements Iterator<Integer>, Serializable {
private Integer i = 0;

@Override
public boolean hasNext() {
return i < 100;
}

@Override
public Integer next() {
i++;
return i;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.heibaiying.datasource;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName InnerDataSource.java
* @Description TODO
* @createTime 2021年05月03日 22:00:00
*/
public class InnerDataSource {
public static void main(String[] args) throws Exception {
// final String filePath = "/Users/lwl/Desktop/person/github/Java-Note/数据中台.md";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.readFile(new TextInputFormat(new Path(filePath))
// , filePath, FileProcessingMode.PROCESS_ONCE
// , 1
// , BasicTypeInfo.STRING_TYPE_INFO).print();
// env.fromCollection(Arrays.asList(1, 2, 3, 4, 5)).print();
env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.heibaiying.transformation;

import com.heibaiying.utils.EnvironmentUtil;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName AggregationsDemo.java
* @Description Aggregations 是官方提供的聚合算子,封装了常用的聚合操作
* @createTime 2021年05月04日 09:58:00
*/
public class AggregationsDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvironmentUtil.getEnvironment();
DataStreamSource<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("1", 1)
, new Tuple2<>("2", 2), new Tuple2<>("3", 3), new Tuple2<>("5", 5));
tuple2DataStream.keyBy(0).sum(1).print();

KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
keyedStream.min(0);
keyedStream.min("key");
// 滚动计算指定key的最大值
keyedStream.max(0);
keyedStream.max("key");
// 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy(0);
keyedStream.minBy("key");
// 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy(0);
keyedStream.maxBy("key");
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.heibaiying.transformation;

import com.heibaiying.utils.EnvironmentUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName ConnectDemo.java
* @Description Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams
* ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态
* @createTime 2021年05月04日 10:21:00
*/
@Slf4j
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvironmentUtil.getEnvironment();

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new
Tuple2<>("a", 3), new Tuple2("b", 5));

DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);

// 使用connect进行连接
ConnectedStreams<Tuple2<String, Integer>, Integer> connect =
streamSource01.connect(streamSource02);

connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
@Override
public Integer map1(Tuple2<String, Integer> value) throws Exception {
log.info("value.f1=" + value.f1);
return value.f1;
}

@Override
public Integer map2(Integer value) throws Exception {
log.info("value=" + value);
return value;
}
}).map(x -> x * 100).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.heibaiying.transformation;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static com.heibaiying.utils.EnvironmentUtil.getEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName FilterDemo.java
* @Description TODO
* @createTime 2021年05月04日 09:39:00
*/
public class FilterDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = getEnvironment();
env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.heibaiying.transformation;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName FlatMapDemo.java
* @Description TODO
* @createTime 2021年05月04日 08:57:00
*/
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
String string01 = "one one one two two";
String string02 = "third third third four";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stringDataStream = env.fromElements(string01, string02);
stringDataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(" ")) {
out.collect(s);
}
}
}).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.heibaiying.transformation;

import com.heibaiying.utils.EnvironmentUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName KeyByAndReduceDemo.java
* @Description KeyBy [DataStream → KeyedStream] :用于将相同 Key 值的数据分到相同的分区中,Reduce [KeyedStream → DataStream] :用于对数据执行归约计算。
* KeyBy 操作存在以下两个限制:
* KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法;
* KeyBy 操作不能用于数组类型。
* @createTime 2021年05月04日 09:44:00
*/
public class KeyByAndReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvironmentUtil.getEnvironment();
DataStreamSource<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("1", 1)
, new Tuple2<>("2", 2), new Tuple2<>("3", 3), new Tuple2<>("5", 5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);

keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2)
-> new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.heibaiying.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName Map.java
* @Description TODO
* @createTime 2021年05月04日 08:46:00
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.heibaiying.transformation;

import com.heibaiying.utils.EnvironmentUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName UnionDemo.java
* @Description 用于连接两个或者多个元素类型相同的 DataStream
* @createTime 2021年05月04日 10:13:00
*/
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = EnvironmentUtil.getEnvironment();
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(
new Tuple2<>("a", 1)
,new Tuple2<>("a", 2));

DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(
new Tuple2<>("b", 1), new Tuple2<>("b", 2));

// streamSource01.union(streamSource02).print();
streamSource01.union(streamSource01,streamSource02).print();
env.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.heibaiying.utils;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author liuweilong
* @version 1.0.0
* @ClassName EnvironmentUtil.java
* @Description TODO
* @createTime 2021年05月04日 09:40:00
*/
public class EnvironmentUtil {
public static StreamExecutionEnvironment getEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
return env;
}
}