Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaomin1423 committed Aug 30, 2022
1 parent 90376c2 commit 21424ad
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 14 deletions.
Expand Up @@ -45,6 +45,8 @@ public final class Constants {

public static final String NOW = "now";

public static final String SAVE_MODE = "saveMode";

private Constants() {
}
}
Expand Up @@ -125,7 +125,7 @@ private LocalDateTime randomLocalDateTime() {
return LocalDateTime.of(
LocalDateTime.now().getYear(),
RandomUtils.nextInt(1, 12),
RandomUtils.nextInt(1, 28),
RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()),
RandomUtils.nextInt(0, 24),
RandomUtils.nextInt(0, 59)
);
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;

@AutoService(SeaTunnelSource.class)
public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
Expand All @@ -40,15 +39,6 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private SeaTunnelContext seaTunnelContext;
private SeaTunnelSchema schema;

public FakeSource() {
}

@VisibleForTesting
public FakeSource(Config pluginConfig, SeaTunnelContext seaTunnelContext) {
this.pluginConfig = pluginConfig;
this.seaTunnelContext = seaTunnelContext;
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
Expand Down Expand Up @@ -73,7 +74,16 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
String saveMode;
if (sinkConfig.hasPath(Constants.SAVE_MODE)) {
saveMode = sinkConfig.getString(Constants.SAVE_MODE);
} else {
saveMode = SaveMode.ErrorIfExists.name();
}
SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
.option("checkpointLocation", "/tmp")
.mode(saveMode)
.save();
}
// the sink is the last stream
return null;
Expand Down
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>2.1.3-SNAPSHOT</version>
<version>${revision}</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -35,6 +35,13 @@ source {
FakeSource {
result_table_name = "fake"
field_name = "name,age,timestamp"
schema {
fields {
name = string
age = int
timestamp = timestamp
}
}
}

# You can also use other input plugins, such as hdfs
Expand Down Expand Up @@ -63,7 +70,9 @@ transform {

sink {
# choose stdout output plugin to output data to console
Console {}
Console {
saveMode = "append"
}

# you can also you other output plugins, such as sql
# hdfs {
Expand Down

0 comments on commit 21424ad

Please sign in to comment.