diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jRowWriterTransform.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jRowWriterTransform.java index fe7d2c9ff0b..dd8d024566c 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jRowWriterTransform.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Neo4jRowWriterTransform.java @@ -31,7 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -104,8 +104,7 @@ public PCollection expand(@NonNull PCollection input) { return input .apply("Create KV pairs", CreateKvTransform.of(parallelism)) - .apply("Group by keys", GroupByKey.create()) - .apply("Split into batches", ParDo.of(SplitIntoBatches.of(batchSize))) + .apply("Group into batches", GroupIntoBatches.ofSize(batchSize)) .apply(target.getSequence() + ": Neo4j write " + target.getName(), ParDo.of(neo4jUnwindFn)) .setRowSchema(input.getSchema()); } diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Partitioner.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Partitioner.java deleted file mode 100644 index 9240116d3be..00000000000 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/Partitioner.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (C) 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.neo4j.transforms; - -import java.util.ArrayList; -import java.util.List; - -class Partitioner { - - public static Iterable> partition(Iterable elements, int size) { - List> batches = new ArrayList<>(); - List currentBatch = new ArrayList<>(size); - for (T element : elements) { - currentBatch.add(element); - if (currentBatch.size() == size) { - batches.add(currentBatch); - currentBatch = new ArrayList<>(size); - } - } - if (!currentBatch.isEmpty()) { - batches.add(currentBatch); - } - return batches; - } -} diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatches.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatches.java deleted file mode 100644 index 38ca9cf52b5..00000000000 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatches.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.neo4j.transforms; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.KV; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class SplitIntoBatches extends DoFn>, KV>> { - - private static final Logger LOG = LoggerFactory.getLogger(CreateKvTransform.class); - private final int batchSize; - - private SplitIntoBatches(int batchSize) { - this.batchSize = batchSize; - } - - public static SplitIntoBatches of(int batchSize) { - if (batchSize <= 0) { - throw new IllegalArgumentException( - String.format("negative batch sizes are not permitted, %d given", batchSize)); - } - return new SplitIntoBatches<>(batchSize); - } - - @ProcessElement - public void processElement(ProcessContext context) { - LOG.info("Splitting into batches of {} rows", batchSize); - KV> input = context.element(); - Integer key = input.getKey(); - Partitioner.partition(input.getValue(), batchSize) - .forEach(batch -> context.output(KV.of(key, batch))); - } -} diff --git a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/PartitionerTest.java b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/PartitionerTest.java deleted file mode 100644 index 24f0553709b..00000000000 --- a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/PartitionerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.neo4j.transforms; - -import static com.google.common.truth.Truth.assertThat; - -import java.util.List; -import org.junit.Test; - -public class PartitionerTest { - - @Test - public void partitions_empty_iterable_yields_empty_partitions() { - List elements = List.of(); - - Iterable> partitions = Partitioner.partition(elements, 1); - - assertThat(partitions).isEmpty(); - } - - @Test - public void partitions_single_partition_when_size_equals_input_size() { - List elements = List.of("1", "2", "3", "4"); - - Iterable> partitions = Partitioner.partition(elements, 4); - - assertThat(partitions).isEqualTo(List.of(List.of("1", "2", "3", "4"))); - } - - @Test - public void partitions_single_partition_when_size_is_larger_than_input_size() { - List elements = List.of("1", "2", "3", "4"); - - Iterable> partitions = Partitioner.partition(elements, 42); - - assertThat(partitions).isEqualTo(List.of(List.of("1", "2", "3", "4"))); - } - - @Test - public void partitions_into_same_sized_partitions() { - List elements = List.of("1", "2", "3", "4"); - - Iterable> partitions = Partitioner.partition(elements, 2); - - assertThat(partitions).isEqualTo(List.of(List.of("1", "2"), List.of("3", "4"))); - } - - @Test - public void partitions_includes_last_elements_when_size_is_not_multiple_of_input_size() { - List elements = List.of("1", "2", "3", "4", "5"); - - Iterable> partitions = Partitioner.partition(elements, 2); - - assertThat(partitions).isEqualTo(List.of(List.of("1", "2"), List.of("3", "4"), List.of("5"))); - } -} diff --git a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatchesTest.java b/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatchesTest.java deleted file mode 100644 index d5d1f3277f8..00000000000 --- a/v2/googlecloud-to-neo4j/src/test/java/com/google/cloud/teleport/v2/neo4j/transforms/SplitIntoBatchesTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (C) 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.neo4j.transforms; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertThrows; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.junit.Rule; -import org.junit.Test; - -public class SplitIntoBatchesTest { - - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - - private final Schema schema = Schema.builder().addInt32Field("value").build(); - - @Test - public void batches_elements() { - List input = Arrays.asList(row(1), row(2), row(3), row(4), row(5), row(6), row(7)); - - PCollection>> batches = - pipeline - .apply(Create.of(input)) - .apply(CreateKvTransform.of(1)) - .apply(GroupByKey.create()) - .apply(ParDo.of(SplitIntoBatches.of(3))); - - PAssert.that(batches) - .satisfies( - (pairs) -> { - List rows = new ArrayList<>(); - pairs.forEach( - pair -> { - Iterable batch = pair.getValue(); - assertThat(batch).isNotNull(); - assertThat(input).containsAtLeastElementsIn(batch); - for (Row row : batch) { - rows.add(row); - } - }); - assertThat(rows).containsExactlyElementsIn(input); - return null; - }); - - pipeline.run(); - } - - @Test - public void rejects_invalid_batch_size() { - IllegalArgumentException exception = - assertThrows(IllegalArgumentException.class, () -> SplitIntoBatches.of(0)); - assertThat(exception) - .hasMessageThat() - .isEqualTo("negative batch sizes are not permitted, 0 given"); - exception = assertThrows(IllegalArgumentException.class, () -> SplitIntoBatches.of(-1)); - assertThat(exception) - .hasMessageThat() - .isEqualTo("negative batch sizes are not permitted, -1 given"); - } - - private Row row(int value) { - return Row.withSchema(schema).addValues(value).build(); - } -}