diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 56ee04d0..6bada6a3 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -181,4 +181,18 @@ private WriteMessage processDeleteStaleRequest(Write.Request request) messageDeleteStale.getSourceName(), new Date(messageDeleteStale.getSyncTime().getSeconds() * 1000)); } + + @Override + public void getSpecSchema( + io.cloudquery.plugin.v3.GetSpecSchema.Request request, + StreamObserver responseObserver) { + io.cloudquery.plugin.v3.GetSpecSchema.Response.Builder builder = + io.cloudquery.plugin.v3.GetSpecSchema.Response.newBuilder(); + String schema = this.plugin.getJsonSchema(); + if (schema != null && !schema.isBlank()) { + builder.setJsonSchema(schema); + } + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } } diff --git a/lib/src/main/java/io/cloudquery/plugin/Plugin.java b/lib/src/main/java/io/cloudquery/plugin/Plugin.java index 5c027016..bb46a0bb 100644 --- a/lib/src/main/java/io/cloudquery/plugin/Plugin.java +++ b/lib/src/main/java/io/cloudquery/plugin/Plugin.java @@ -18,6 +18,7 @@ public abstract class Plugin { @NonNull protected final String name; @NonNull protected final String version; @Setter protected Logger logger; + @Setter protected String jsonSchema; protected ClientMeta client; public void init(String spec, NewClientOptions options) throws Exception { diff --git a/lib/src/test/java/io/cloudquery/internal/servers/plugin/v3/PluginServerTest.java b/lib/src/test/java/io/cloudquery/internal/servers/plugin/v3/PluginServerTest.java index f75019f6..e1e4d66f 100644 --- a/lib/src/test/java/io/cloudquery/internal/servers/plugin/v3/PluginServerTest.java +++ b/lib/src/test/java/io/cloudquery/internal/servers/plugin/v3/PluginServerTest.java @@ -1,5 +1,6 @@ package io.cloudquery.internal.servers.plugin.v3; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -9,6 +10,7 @@ import io.cloudquery.messages.WriteInsert; import io.cloudquery.messages.WriteMigrateTable; import io.cloudquery.plugin.Plugin; +import io.cloudquery.plugin.v3.GetSpecSchema; import io.cloudquery.plugin.v3.PluginGrpc; import io.cloudquery.plugin.v3.PluginGrpc.PluginStub; import io.cloudquery.plugin.v3.Write; @@ -25,12 +27,14 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import lombok.Getter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.junit.Rule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @@ -91,6 +95,31 @@ public void shouldSendWriteDeleteStaleMessage() throws Exception { verify(plugin).write(any(WriteDeleteStale.class)); } + @Test + public void shouldSendNullJSONSchema() throws Exception { + NullResponseStream responseObserver = new NullResponseStream<>(); + + pluginStub.getSpecSchema(GetSpecSchema.Request.getDefaultInstance(), responseObserver); + responseObserver.await(); + + verify(plugin).getJsonSchema(); + assertFalse(responseObserver.getValue().hasJsonSchema()); + } + + @Test + public void shouldSendNonNullJSONSchema() throws Exception { + Mockito.doReturn("{}").when(plugin).getJsonSchema(); + + NullResponseStream responseObserver = new NullResponseStream<>(); + + pluginStub.getSpecSchema(GetSpecSchema.Request.getDefaultInstance(), responseObserver); + responseObserver.await(); + + verify(plugin).getJsonSchema(); + assertTrue(responseObserver.getValue().hasJsonSchema()); + assertEquals("{}", responseObserver.getValue().getJsonSchema()); + } + private static Write.Request generateMigrateTableMessage() throws IOException { Table table = Table.builder().name("test").build(); return Write.Request.newBuilder() @@ -121,12 +150,18 @@ private Write.Request generateDeleteStaleMessage() { private static class NullResponseStream implements StreamObserver { private final CountDownLatch countDownLatch = new CountDownLatch(1); + @Getter private T value; + @Getter private Throwable error; @Override - public void onNext(T value) {} + public void onNext(T value) { + this.value = value; + } @Override - public void onError(Throwable t) {} + public void onError(Throwable t) { + this.error = t; + } @Override public void onCompleted() {