Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Data Streamer with Receiver #3611

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.table;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.function.Function;

/**
* Data streamer receiver.
*
* @param <T> Payload type.
* @param <R> Result type.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface DataStreamerReceiver<T, R> {
/**
* Receives an item from the data streamer (see {@link DataStreamerTarget#streamData(Publisher, DataStreamerOptions,
* Function, Function, Subscriber, List, String, Object...)}).
*
* <p>The receiver is called for each page (batch) in the data streamer and is responsible for processing the items,
* updating zero or more tables, and returning a result.
*
* @param page Item batch.
* @param ctx Receiver context.
* @param args Additional arguments.
* @return Future with the result.
*/
CompletableFuture<List<R>> receive(
List<T> page,
DataStreamerReceiverContext ctx,
Object... args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.table;

import org.apache.ignite.Ignite;

/**
* Context of the {@link DataStreamerReceiver} execution.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface DataStreamerReceiverContext {
/**
* Gets the Ignite instance.
*
* @return Ignite instance.
*/
Ignite ignite();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.ignite.table;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Function;
import org.apache.ignite.compute.DeploymentUnit;
import org.jetbrains.annotations.Nullable;

/**
* Represents an entity that can be used as a target for streaming data.
*
* @param <T> Entry type.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface DataStreamerTarget<T> {
/**
* Streams data into the underlying table.
Expand All @@ -38,4 +40,30 @@ public interface DataStreamerTarget<T> {
CompletableFuture<Void> streamData(
Flow.Publisher<DataStreamerItem<T>> publisher,
@Nullable DataStreamerOptions options);

/**
* Streams data with receiver. The receiver is responsible for processing the data and updating zero or more tables.
*
* @param publisher Producer.
* @param options Options (can be null).
* @param keyFunc Key function. The key is only used locally for colocation.
* @param payloadFunc Payload function. The payload is sent to the receiver.
* @param resultSubscriber Optional subscriber for the receiver results.
* @param deploymentUnits Target deployment units. Can be empty.
* @param receiverClassName Receiver class name.
* @param receiverArgs Receiver arguments.
* @return Future that will be completed when the stream is finished.
* @param <E> Producer item type.
* @param <V> Payload type.
* @param <R> Result type.
*/
<E, V, R> CompletableFuture<Void> streamData(
Flow.Publisher<E> publisher,
@Nullable DataStreamerOptions options,
Function<E, T> keyFunc,
Function<E, V> payloadFunc,
@Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits,
String receiverClassName,
Object... receiverArgs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.lang.IgniteBiTuple;
Expand Down Expand Up @@ -484,6 +487,13 @@ public CompletableFuture<Void> streamData(
return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl);
}

@Override
public <E, V, R> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, Entry<Tuple, Tuple>> keyFunc, Function<E, V> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
return null;
}

/** {@inheritDoc} */
@Override
protected Function<SqlRow, Entry<Tuple, Tuple>> queryMapper(ResultSetMetadata meta, ClientSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.PayloadInputChannel;
Expand Down Expand Up @@ -604,6 +607,13 @@ public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<Entry<K, V>
return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl);
}

@Override
public <E, T, R> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, Entry<K, V>> keyFunc, Function<E, T> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
throw new UnsupportedOperationException("Not implemented yet.");
}

/** {@inheritDoc} */
@Override
protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata meta, ClientSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
Expand Down Expand Up @@ -413,4 +416,12 @@ public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<Tuple>> pub

return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl);
}

/** {@inheritDoc} */
@Override
public <E, V, R> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, Tuple> keyFunc, Function<E, V> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.client.sql.ClientSql;
Expand Down Expand Up @@ -419,6 +421,13 @@ public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<R>> publish
return ClientDataStreamer.streamData(publisher, opts, batchSender, provider, tbl);
}

@Override
public <E, V, R1> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, R> keyFunc, Function<E, V> payloadFunc, @Nullable Flow.Subscriber<R1> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
throw new UnsupportedOperationException("Not implemented yet.");
}

/** {@inheritDoc} */
@Override
protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta, ClientSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.DataStreamerReceiverContext;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
Expand Down Expand Up @@ -335,6 +338,28 @@ public void testSchemaUpdateWhileStreaming() throws InterruptedException {
assertEquals("bar", view.get(null, tupleKey(2)).stringValue("name"));
}

@Test
public void testReceiver() {
CompletableFuture<Void> streamerFut;
RecordView<Tuple> view = defaultTable().recordView();

try (var publisher = new SubmissionPublisher<CustomData>()) {
streamerFut = view.<CustomData, String, Boolean>streamData(
publisher,
null,
item -> Tuple.create().set("id", item.getId()),
item -> item.serializeToString(),
null,
List.of(new DeploymentUnit("test", "1.0.0")),
DemoReceiver.class.getName(),
"receiverArg1");

publisher.submit(new CustomData(1, "x"));
}

streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
}

private void waitForKey(RecordView<Tuple> view, Tuple key) throws InterruptedException {
assertTrue(waitForCondition(() -> {
@SuppressWarnings("resource")
Expand Down Expand Up @@ -399,4 +424,48 @@ private PersonValPojo() {
this.name = name;
}
}

private static class CustomData {
private final int id;
private final String info;

public CustomData(int id, String info) {
this.id = id;
this.info = info;
}

public int getId() {
return id;
}

public String getInfo() {
return info;
}

public String serializeToString() {
return id + ":" + info;
}

public static CustomData deserializeFromString(String str) {
String[] parts = str.split(":");
return new CustomData(Integer.parseInt(parts[0]), parts[1]);
}
}

private static class DemoReceiver implements DataStreamerReceiver<String, Boolean> {
@Override
public CompletableFuture<List<Boolean>> receive(List<String> page, DataStreamerReceiverContext ctx, Object... args) {
List<Tuple> dataItems = page.stream()
.map(CustomData::deserializeFromString)
.map(data -> tuple(data.getId(), data.getInfo()))
.collect(Collectors.toList());

return ctx.ignite()
.tables()
.table(TABLE_NAME)
.recordView()
.insertAllAsync(null, dataItems)
.thenApply(ignored -> List.of(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.marshaller.MarshallersProvider;
import org.apache.ignite.internal.schema.BinaryRow;
Expand Down Expand Up @@ -571,6 +573,13 @@ public CompletableFuture<Void> streamData(
return convertToPublicFuture(future);
}

@Override
public <E, V, R> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, Entry<Tuple, Tuple>> keyFunc, Function<E, V> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
throw new UnsupportedOperationException("Not implemented yet.");
}

private List<BinaryRowEx> marshalPairs(Collection<Entry<Tuple, Tuple>> pairs, int schemaVersion, @Nullable BitSet deleted) {
List<BinaryRowEx> rows = new ArrayList<>(pairs.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.marshaller.Marshaller;
import org.apache.ignite.internal.marshaller.MarshallerException;
Expand Down Expand Up @@ -710,6 +712,13 @@ public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<Entry<K, V>
return convertToPublicFuture(future);
}

@Override
public <E, T, R> CompletableFuture<Void> streamData(Publisher<E> publisher, @Nullable DataStreamerOptions options,
Function<E, Entry<K, V>> keyFunc, Function<E, T> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber,
List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs) {
throw new UnsupportedOperationException("Not implemented yet.");
}

/** {@inheritDoc} */
@Override
protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata meta, SchemaDescriptor schema) {
Expand Down