Skip to content

Commit

Permalink
Fix up IcebergIO Write path
Browse files Browse the repository at this point in the history
 - remove Read path (will propose separately)
 - re-enable checking, fix type errors
 - some style adjustments
  • Loading branch information
kennknowles committed Mar 29, 2024
1 parent edda070 commit a06a187
Show file tree
Hide file tree
Showing 36 changed files with 636 additions and 1,448 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ class BeamModulePlugin implements Plugin<Project> {
options.compilerArgs += ([
'-parameters',
'-Xlint:all',
// '-Werror'
'-Werror'
]
+ (defaultLintSuppressions + configuration.disableLintWarnings).collect { "-Xlint:-${it}" })
}
Expand Down
34 changes: 0 additions & 34 deletions sdks/java/io/catalog/build.gradle

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,52 +31,51 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;

@SuppressWarnings("all") // TODO: Remove this once development is stable.
public class PrepareWrite<InputT, DestinationT, OutputT>
extends PTransform<PCollection<InputT>, PCollection<KV<DestinationT, OutputT>>> {
class AssignDestinations<ElementT, DestinationT, OutputT>
extends PTransform<PCollection<ElementT>, PCollection<KV<DestinationT, OutputT>>> {

private DynamicDestinations<InputT, DestinationT> dynamicDestinations;
private SerializableFunction<InputT, OutputT> formatFunction;
private Coder outputCoder;
private DynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private SerializableFunction<ElementT, OutputT> formatFunction;
private Coder<OutputT> outputCoder;

public PrepareWrite(
DynamicDestinations<InputT, DestinationT> dynamicDestinations,
SerializableFunction<InputT, OutputT> formatFunction,
Coder outputCoder) {
public AssignDestinations(
DynamicDestinations<ElementT, DestinationT> dynamicDestinations,
SerializableFunction<ElementT, OutputT> formatFunction,
Coder<OutputT> outputCoder) {
this.dynamicDestinations = dynamicDestinations;
this.formatFunction = formatFunction;
this.outputCoder = outputCoder;
}

@Override
public PCollection<KV<DestinationT, OutputT>> expand(PCollection<InputT> input) {
public PCollection<KV<DestinationT, OutputT>> expand(PCollection<ElementT> input) {

final Coder destCoder;
final Coder<KV<DestinationT, OutputT>> destCoder;
try {
destCoder =
KvCoder.of(
dynamicDestinations.getDestinationCoderWithDefault(
input.getPipeline().getCoderRegistry()),
outputCoder);
} catch (Exception e) {
RuntimeException e1 = new RuntimeException("Unable to expand PrepareWrite");
RuntimeException e1 = new RuntimeException("Unable to expand AssignDestinations");
e1.addSuppressed(e);
throw e1;
}
return input
.apply(
ParDo.of(
new DoFn<InputT, KV<DestinationT, OutputT>>() {
new DoFn<ElementT, KV<DestinationT, OutputT>>() {

@ProcessElement
public void processElement(
ProcessContext c,
@Element InputT element,
@Element ElementT element,
@Timestamp Instant timestamp,
BoundedWindow window,
PaneInfo pane)
throws IOException {
ValueInSingleWindow<InputT> windowedElement =
ValueInSingleWindow<ElementT> windowedElement =
ValueInSingleWindow.of(element, timestamp, window, pane);
dynamicDestinations.setSideInputProcessContext(c);
DestinationT tableDestination =
Expand Down

0 comments on commit a06a187

Please sign in to comment.