From 0d8ab129d9aebffafd301cfed06606d259176d4a Mon Sep 17 00:00:00 2001 From: "jishiwen.jsw" Date: Sat, 15 Nov 2025 13:26:33 +0800 Subject: [PATCH 1/2] chore(test): add unit test for geaflow/geaflow-analytics-service/geaflow-analytics-service-client module Signed-off-by: jishiwen.jsw --- .../client/AbstractQueryRunnerTest.java | 117 +++++++ .../client/AnalyticsClientBuilderTest.java | 105 ++++++ .../client/AnalyticsManagerOptionsTest.java | 47 --- .../client/AnalyticsServiceInfoTest.java | 86 +++++ .../service/client/HttpResponseTest.java | 181 +++++++++++ .../client/QueryRunnerContextTest.java | 82 +++++ .../service/client/QueryRunnerStatusTest.java | 49 +++ .../client/jdbc/AnalyticsDriverURITest.java | 197 ++++++++++-- .../service/client/utils/JDBCUtilsTest.java | 67 ++++ .../ElasticsearchTableSource.java | 270 ++++++++++++++++ .../dsl/connector/neo4j/Neo4jTableSink.java | 301 ++++++++++++++++++ 11 files changed, 1435 insertions(+), 67 deletions(-) create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AbstractQueryRunnerTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsClientBuilderTest.java delete mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsManagerOptionsTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsServiceInfoTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/HttpResponseTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerContextTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerStatusTest.java create mode 100644 geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/utils/JDBCUtilsTest.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java create mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AbstractQueryRunnerTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AbstractQueryRunnerTest.java new file mode 100644 index 000000000..d3053af96 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AbstractQueryRunnerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.geaflow.analytics.service.query.QueryResults; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.rpc.HostAndPort; +import org.apache.geaflow.pipeline.service.ServiceType; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class AbstractQueryRunnerTest { + + @Test + public void testQueryRunnerStatusMethods() { + try (TestQueryRunner runner = new TestQueryRunner()) { + QueryRunnerContext context = QueryRunnerContext.newBuilder() + .setConfiguration(new Configuration()) + .setHost(new HostAndPort("localhost", 8080)) + .build(); + + runner.init(context); + + Assert.assertTrue(runner.isRunning()); + Assert.assertFalse(runner.isAborted()); + Assert.assertFalse(runner.isError()); + Assert.assertFalse(runner.isFinished()); + + runner.setStatus(QueryRunnerStatus.ERROR); + Assert.assertFalse(runner.isRunning()); + Assert.assertTrue(runner.isError()); + + runner.setStatus(QueryRunnerStatus.ABORTED); + Assert.assertTrue(runner.isAborted()); + + runner.setStatus(QueryRunnerStatus.FINISHED); + Assert.assertTrue(runner.isFinished()); + } + } + + @Test + public void testInitWithHostAndPort() { + try (TestQueryRunner runner = new TestQueryRunner()) { + Configuration config = new Configuration(); + HostAndPort hostAndPort = new HostAndPort("test-host", 9090); + + QueryRunnerContext context = QueryRunnerContext.newBuilder() + .setConfiguration(config) + .setHost(hostAndPort) + .build(); + + runner.init(context); + + Assert.assertNotNull(runner.getAnalyticsServiceInfo()); + Assert.assertEquals(runner.getAnalyticsServiceInfo().getCoordinatorNum(), 1); + Assert.assertEquals(runner.getAnalyticsServiceInfo().getCoordinatorAddresses(0), hostAndPort); + } + } + + private static class TestQueryRunner extends AbstractQueryRunner { + + @Override + protected void initManagedChannel(HostAndPort address) { + // No-op for test + } + + @Override + public QueryResults executeQuery(String queryScript) { + return null; + } + + @Override + public ServiceType getServiceType() { + return ServiceType.analytics_http; + } + + @Override + public QueryResults cancelQuery(long queryId) { + return null; + } + + @Override + public void close() { + // No-op for test + } + + public void setStatus(QueryRunnerStatus status) { + if (this.queryRunnerStatus == null) { + this.queryRunnerStatus = new AtomicReference<>(status); + } else { + this.queryRunnerStatus.set(status); + } + } + + public AnalyticsServiceInfo getAnalyticsServiceInfo() { + return this.analyticsServiceInfo; + } + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsClientBuilderTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsClientBuilderTest.java new file mode 100644 index 000000000..a3368aba4 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsClientBuilderTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import org.apache.geaflow.analytics.service.config.AnalyticsClientConfigKeys; +import org.apache.geaflow.common.config.Configuration; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class AnalyticsClientBuilderTest { + + @Test + public void testBuilderWithHost() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withHost("localhost"); + Assert.assertEquals(builder.getHost(), "localhost"); + } + + @Test + public void testBuilderWithPort() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withPort(8080); + Assert.assertEquals(builder.getPort(), 8080); + } + + @Test + public void testBuilderWithUser() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withUser("testUser"); + Assert.assertEquals(builder.getUser(), "testUser"); + } + + @Test + public void testBuilderWithTimeoutMs() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withTimeoutMs(5000); + Assert.assertEquals( + builder.getConfiguration().getString(AnalyticsClientConfigKeys.ANALYTICS_CLIENT_CONNECT_TIMEOUT_MS), + "5000" + ); + } + + @Test + public void testBuilderWithRetryNum() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withRetryNum(3); + Assert.assertEquals(builder.getQueryRetryNum(), 3); + Assert.assertEquals( + builder.getConfiguration().getString(AnalyticsClientConfigKeys.ANALYTICS_CLIENT_CONNECT_RETRY_NUM), + "3" + ); + } + + @Test + public void testBuilderWithInitChannelPools() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withInitChannelPools(true); + Assert.assertTrue(builder.enableInitChannelPools()); + } + + @Test + public void testBuilderWithConfiguration() { + Configuration config = new Configuration(); + config.put("test.key", "test.value"); + + AnalyticsClientBuilder builder = new AnalyticsClientBuilder(); + builder.withConfiguration(config); + + Assert.assertEquals(builder.getConfiguration().getString("test.key"), "test.value"); + } + + @Test + public void testBuilderChaining() { + AnalyticsClientBuilder builder = new AnalyticsClientBuilder() + .withHost("localhost") + .withPort(8080) + .withUser("testUser") + .withTimeoutMs(5000) + .withRetryNum(3) + .withInitChannelPools(true); + + Assert.assertEquals(builder.getHost(), "localhost"); + Assert.assertEquals(builder.getPort(), 8080); + Assert.assertEquals(builder.getUser(), "testUser"); + Assert.assertEquals(builder.getQueryRetryNum(), 3); + Assert.assertTrue(builder.enableInitChannelPools()); + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsManagerOptionsTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsManagerOptionsTest.java deleted file mode 100644 index ada413518..000000000 --- a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsManagerOptionsTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.geaflow.analytics.service.client; - -import org.testng.Assert; -import org.testng.annotations.Test; - -public class AnalyticsManagerOptionsTest { - private static final String URL_PATH = "/rest/analytics/query/execute"; - - @Test - public void testCreateClientSession() { - String host = "localhost"; - int port = 8080; - AnalyticsManagerSession clientSession1 = AnalyticsManagerOptions.createClientSession(host, port); - AnalyticsManagerSession clientSession2 = AnalyticsManagerOptions.createClientSession(port); - Assert.assertEquals(clientSession1.getServer().toString(), clientSession2.getServer().toString()); - Assert.assertEquals(clientSession1.getServer().toString(), "http://localhost:8080"); - } - - @Test - public void testServerResolve() { - String host = "localhost"; - int port = 8080; - AnalyticsManagerSession clientSession = AnalyticsManagerOptions.createClientSession(host, port); - String fullUri = clientSession.getServer().resolve(URL_PATH).toString(); - Assert.assertEquals(fullUri, "http://localhost:8080/rest/analytics/query/execute"); - } - -} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsServiceInfoTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsServiceInfoTest.java new file mode 100644 index 000000000..3c0a3b4c8 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/AnalyticsServiceInfoTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.geaflow.common.rpc.HostAndPort; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class AnalyticsServiceInfoTest { + + @Test + public void testAnalyticsServiceInfoWithServerName() { + List addresses = Arrays.asList( + new HostAndPort("host1", 8080), + new HostAndPort("host2", 8081) + ); + + AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo("testServer", addresses); + + Assert.assertEquals(serviceInfo.getServerName(), "testServer"); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(), addresses); + Assert.assertEquals(serviceInfo.getCoordinatorNum(), 2); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0), addresses.get(0)); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(1), addresses.get(1)); + } + + @Test + public void testAnalyticsServiceInfoWithoutServerName() { + List addresses = Collections.singletonList( + new HostAndPort("localhost", 9090) + ); + + AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses); + + Assert.assertNull(serviceInfo.getServerName()); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(), addresses); + Assert.assertEquals(serviceInfo.getCoordinatorNum(), 1); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0), addresses.get(0)); + } + + @Test + public void testGetCoordinatorAddressByIndex() { + List addresses = Arrays.asList( + new HostAndPort("host1", 8080), + new HostAndPort("host2", 8081), + new HostAndPort("host3", 8082) + ); + + AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses); + + Assert.assertEquals(serviceInfo.getCoordinatorNum(), 3); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0).getHost(), "host1"); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(0).getPort(), 8080); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(1).getHost(), "host2"); + Assert.assertEquals(serviceInfo.getCoordinatorAddresses(2).getHost(), "host3"); + } + + @Test + public void testEmptyCoordinatorAddresses() { + List addresses = Collections.emptyList(); + AnalyticsServiceInfo serviceInfo = new AnalyticsServiceInfo(addresses); + + Assert.assertEquals(serviceInfo.getCoordinatorNum(), 0); + Assert.assertTrue(serviceInfo.getCoordinatorAddresses().isEmpty()); + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/HttpResponseTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/HttpResponseTest.java new file mode 100644 index 000000000..3d0644d61 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/HttpResponseTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.geaflow.analytics.service.query.QueryError; +import org.apache.geaflow.analytics.service.query.QueryResults; +import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.apache.geaflow.common.serialize.SerializerFactory; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class HttpResponseTest { + + @Test + public void testSuccessfulResponse() throws IOException { + QueryError queryError = new QueryError("success", 0); + QueryResults queryResults = new QueryResults("query-1", queryError); + byte[] serializedData = SerializerFactory.getKryoSerializer().serialize(queryResults); + + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = Mockito.mock(StatusLine.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))).thenReturn(mockResponse); + Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + Mockito.when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + Mockito.when(mockStatusLine.toString()).thenReturn("HTTP/1.1 200 OK"); + Mockito.when(mockResponse.getAllHeaders()).thenReturn(new org.apache.http.Header[0]); + Mockito.when(mockResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(serializedData)); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse httpResponse = HttpResponse.execute(mockClient, request); + + Assert.assertTrue(httpResponse.enableQuerySuccess()); + Assert.assertEquals(httpResponse.getStatusCode(), HttpStatus.SC_OK); + Assert.assertNotNull(httpResponse.getValue()); + Assert.assertEquals(httpResponse.getValue().getQueryId(), "query-1"); + } + + @Test + public void testFailedResponse() throws IOException { + byte[] errorData = "Internal Server Error".getBytes(); + + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = Mockito.mock(StatusLine.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))).thenReturn(mockResponse); + Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + Mockito.when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + Mockito.when(mockStatusLine.toString()).thenReturn("HTTP/1.1 500 Internal Server Error"); + Mockito.when(mockResponse.getAllHeaders()).thenReturn(new org.apache.http.Header[0]); + Mockito.when(mockResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(errorData)); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse httpResponse = HttpResponse.execute(mockClient, request); + + Assert.assertFalse(httpResponse.enableQuerySuccess()); + Assert.assertEquals(httpResponse.getStatusCode(), HttpStatus.SC_INTERNAL_SERVER_ERROR); + Assert.assertNotNull(httpResponse.getException()); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testGetValueOnFailedResponse() throws IOException { + byte[] errorData = "Error".getBytes(); + + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = Mockito.mock(StatusLine.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))).thenReturn(mockResponse); + Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + Mockito.when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_BAD_REQUEST); + Mockito.when(mockStatusLine.toString()).thenReturn("HTTP/1.1 400 Bad Request"); + Mockito.when(mockResponse.getAllHeaders()).thenReturn(new org.apache.http.Header[0]); + Mockito.when(mockResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(errorData)); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse httpResponse = HttpResponse.execute(mockClient, request); + + httpResponse.getValue(); // Should throw exception + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testExecuteWithIOException() throws IOException { + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))) + .thenThrow(new IOException("Connection refused")); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse.execute(mockClient, request); + } + + @Test + public void testGetHeaders() throws IOException { + QueryError queryError = new QueryError("success", 0); + QueryResults queryResults = new QueryResults("query-1", queryError); + byte[] serializedData = SerializerFactory.getKryoSerializer().serialize(queryResults); + + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = Mockito.mock(StatusLine.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + + org.apache.http.Header[] headers = new org.apache.http.Header[0]; + + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))).thenReturn(mockResponse); + Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + Mockito.when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + Mockito.when(mockStatusLine.toString()).thenReturn("HTTP/1.1 200 OK"); + Mockito.when(mockResponse.getAllHeaders()).thenReturn(headers); + Mockito.when(mockResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(serializedData)); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse httpResponse = HttpResponse.execute(mockClient, request); + + Assert.assertNotNull(httpResponse.getHeaders()); + Assert.assertEquals(httpResponse.getHeaders().length, 0); + } + + @Test + public void testToString() throws IOException { + QueryError queryError = new QueryError("success", 0); + QueryResults queryResults = new QueryResults("query-1", queryError); + byte[] serializedData = SerializerFactory.getKryoSerializer().serialize(queryResults); + + CloseableHttpClient mockClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + StatusLine mockStatusLine = Mockito.mock(StatusLine.class); + HttpEntity mockEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockClient.execute(Mockito.any(HttpPost.class))).thenReturn(mockResponse); + Mockito.when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); + Mockito.when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + Mockito.when(mockStatusLine.toString()).thenReturn("HTTP/1.1 200 OK"); + Mockito.when(mockResponse.getAllHeaders()).thenReturn(new org.apache.http.Header[0]); + Mockito.when(mockResponse.getEntity()).thenReturn(mockEntity); + Mockito.when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(serializedData)); + + HttpPost request = new HttpPost("http://localhost:8080/test"); + HttpResponse httpResponse = HttpResponse.execute(mockClient, request); + + String toString = httpResponse.toString(); + Assert.assertNotNull(toString); + Assert.assertTrue(toString.contains("statusCode")); + Assert.assertTrue(toString.contains("querySuccess")); + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerContextTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerContextTest.java new file mode 100644 index 000000000..ad724eff0 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerContextTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.rpc.HostAndPort; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class QueryRunnerContextTest { + + @Test + public void testQueryRunnerContextBuilder() { + Configuration config = new Configuration(); + config.put("test.key", "test.value"); + HostAndPort hostAndPort = new HostAndPort("localhost", 8080); + + QueryRunnerContext context = QueryRunnerContext.newBuilder() + .setConfiguration(config) + .setHost(hostAndPort) + .enableInitChannelPools(true) + .setAnalyticsServiceJobName("testJob") + .setMetaServerAddress("meta-server:8081") + .build(); + + Assert.assertNotNull(context.getConfiguration()); + Assert.assertEquals(context.getConfiguration().getString("test.key"), "test.value"); + Assert.assertEquals(context.getHostAndPort(), hostAndPort); + Assert.assertTrue(context.isInitChannelPools()); + Assert.assertTrue(context.enableInitChannelPools()); + Assert.assertEquals(context.getMetaServerBaseNode(), "testJob"); + Assert.assertEquals(context.getMetaServerAddress(), "meta-server:8081"); + } + + @Test + public void testQueryRunnerContextBuilderWithMinimalParams() { + Configuration config = new Configuration(); + + QueryRunnerContext context = QueryRunnerContext.newBuilder() + .setConfiguration(config) + .build(); + + Assert.assertNotNull(context.getConfiguration()); + Assert.assertNull(context.getHostAndPort()); + Assert.assertFalse(context.isInitChannelPools()); + Assert.assertNull(context.getMetaServerBaseNode()); + Assert.assertNull(context.getMetaServerAddress()); + } + + @Test + public void testQueryRunnerContextBuilderChaining() { + Configuration config = new Configuration(); + HostAndPort hostAndPort = new HostAndPort("host", 9090); + + QueryRunnerContext.ClientHandlerContextBuilder builder = QueryRunnerContext.newBuilder(); + QueryRunnerContext context = builder + .setConfiguration(config) + .setHost(hostAndPort) + .enableInitChannelPools(false) + .build(); + + Assert.assertNotNull(context); + Assert.assertFalse(context.isInitChannelPools()); + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerStatusTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerStatusTest.java new file mode 100644 index 000000000..f9c7578d5 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/QueryRunnerStatusTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class QueryRunnerStatusTest { + + @Test + public void testQueryRunnerStatusValues() { + QueryRunnerStatus[] statuses = QueryRunnerStatus.values(); + Assert.assertEquals(statuses.length, 4); + Assert.assertEquals(statuses[0], QueryRunnerStatus.RUNNING); + Assert.assertEquals(statuses[1], QueryRunnerStatus.ERROR); + Assert.assertEquals(statuses[2], QueryRunnerStatus.ABORTED); + Assert.assertEquals(statuses[3], QueryRunnerStatus.FINISHED); + } + + @Test + public void testQueryRunnerStatusValueOf() { + Assert.assertEquals(QueryRunnerStatus.valueOf("RUNNING"), QueryRunnerStatus.RUNNING); + Assert.assertEquals(QueryRunnerStatus.valueOf("ERROR"), QueryRunnerStatus.ERROR); + Assert.assertEquals(QueryRunnerStatus.valueOf("ABORTED"), QueryRunnerStatus.ABORTED); + Assert.assertEquals(QueryRunnerStatus.valueOf("FINISHED"), QueryRunnerStatus.FINISHED); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testQueryRunnerStatusInvalidValueOf() { + QueryRunnerStatus.valueOf("INVALID"); + } +} diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/jdbc/AnalyticsDriverURITest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/jdbc/AnalyticsDriverURITest.java index 9cb4a7be9..986413aea 100644 --- a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/jdbc/AnalyticsDriverURITest.java +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/jdbc/AnalyticsDriverURITest.java @@ -19,35 +19,192 @@ package org.apache.geaflow.analytics.service.client.jdbc; -import static java.lang.String.format; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.fail; - +import java.net.URI; +import java.util.Map; import java.util.Properties; +import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.testng.Assert; import org.testng.annotations.Test; public class AnalyticsDriverURITest { @Test - public void testInvalidURI() { - assertInvalid("jdbc:geaflow://localhost/", "No port number specified:"); + public void testValidURLWithUserAndGraphView() { + String url = "jdbc:geaflow://localhost:8080/myGraph?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + + Assert.assertEquals(driverURI.getGraphView(), "myGraph"); + Assert.assertEquals(driverURI.getUser(), "testUser"); + Assert.assertEquals(driverURI.getAuthority(), "localhost:8080"); + Assert.assertFalse(driverURI.isCompressionDisabled()); + } + + @Test + public void testValidURLWithoutGraphView() { + String url = "jdbc:geaflow://localhost:8080?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + + Assert.assertNull(driverURI.getGraphView()); + Assert.assertEquals(driverURI.getUser(), "testUser"); + } + + @Test + public void testUserInProperties() { + String url = "jdbc:geaflow://localhost:8080"; + Properties props = new Properties(); + props.setProperty("user", "propUser"); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + + Assert.assertEquals(driverURI.getUser(), "propUser"); + } + + @Test + public void testURLParametersParsing() { + String url = "jdbc:geaflow://localhost:8080?user=testUser&timeout=5000"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + + Properties properties = driverURI.getProperties(); + Assert.assertEquals(properties.getProperty("user"), "testUser"); + Assert.assertEquals(properties.getProperty("timeout"), "5000"); + } + + @Test + public void testHttpURI() { + String url = "jdbc:geaflow://localhost:8080?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + URI httpUri = driverURI.getHttpUri(); + + Assert.assertEquals(httpUri.getScheme(), "http"); + Assert.assertEquals(httpUri.getHost(), "localhost"); + Assert.assertEquals(httpUri.getPort(), 8080); + } + + @Test + public void testSecureHttpURI() { + String url = "jdbc:geaflow://localhost:443?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + URI httpUri = driverURI.getHttpUri(); + + Assert.assertEquals(httpUri.getScheme(), "https"); + Assert.assertEquals(httpUri.getPort(), 443); + } + + @Test + public void testGetSessionProperties() { + String url = "jdbc:geaflow://localhost:8080?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + Map sessionProps = driverURI.getSessionProperties(); + + Assert.assertNotNull(sessionProps); + } + + @Test + public void testGetCustomHeaders() { + String url = "jdbc:geaflow://localhost:8080?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + Map customHeaders = driverURI.getCustomHeaders(); + + Assert.assertNotNull(customHeaders); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testInvalidURLWithoutHost() { + String url = "jdbc:geaflow://:8080?user=testUser"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testInvalidURLWithoutPort() { + String url = "jdbc:geaflow://localhost?user=testUser"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testInvalidURLWithInvalidPort() { + String url = "jdbc:geaflow://localhost:99999?user=testUser"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testInvalidURLWithZeroPort() { + String url = "jdbc:geaflow://localhost:0?user=testUser"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); } - private static void assertInvalid(String url, String prefix) { - try { - createDriverUri(url); - fail("expected exception"); - } catch (Exception e) { - assertNotNull(e.getMessage()); - if (!e.getMessage().startsWith(prefix)) { - fail(format("expected:<%s> to start with <%s>", e.getMessage(), prefix)); - } - } + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testURLWithoutUser() { + String url = "jdbc:geaflow://localhost:8080"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + driverURI.getUser(); // Should throw exception } - private static AnalyticsDriverURI createDriverUri(String url) { - Properties properties = new Properties(); - properties.setProperty("user", "only-test"); - return new AnalyticsDriverURI(url, properties); + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testDuplicatePropertyInURLAndProperties() { + String url = "jdbc:geaflow://localhost:8080?user=urlUser"; + Properties props = new Properties(); + props.setProperty("user", "propUser"); + + new AnalyticsDriverURI(url, props); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testDuplicateParameterInURL() { + String url = "jdbc:geaflow://localhost:8080?user=user1&user=user2"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testInvalidGraphViewPath() { + String url = "jdbc:geaflow://localhost:8080/graph/extra/segment?user=testUser"; + Properties props = new Properties(); + + new AnalyticsDriverURI(url, props); + } + + @Test + public void testEmptyGraphView() { + String url = "jdbc:geaflow://localhost:8080/?user=testUser"; + Properties props = new Properties(); + + // Empty path after '/' is treated as no graph view, which is valid + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + Assert.assertNull(driverURI.getGraphView()); + } + + @Test + public void testGraphViewWithTrailingSlash() { + String url = "jdbc:geaflow://localhost:8080/myGraph/?user=testUser"; + Properties props = new Properties(); + + AnalyticsDriverURI driverURI = new AnalyticsDriverURI(url, props); + + Assert.assertEquals(driverURI.getGraphView(), "myGraph"); } } diff --git a/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/utils/JDBCUtilsTest.java b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/utils/JDBCUtilsTest.java new file mode 100644 index 000000000..e8bb27b98 --- /dev/null +++ b/geaflow/geaflow-analytics-service/geaflow-analytics-service-client/src/test/java/org/apache/geaflow/analytics/service/client/utils/JDBCUtilsTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.analytics.service.client.utils; + +import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class JDBCUtilsTest { + + @Test + public void testAcceptsValidURL() { + String validUrl = "jdbc:geaflow://localhost:8080"; + Assert.assertTrue(JDBCUtils.acceptsURL(validUrl)); + } + + @Test + public void testAcceptsValidURLWithPath() { + String validUrl = "jdbc:geaflow://localhost:8080/graphView"; + Assert.assertTrue(JDBCUtils.acceptsURL(validUrl)); + } + + @Test + public void testAcceptsValidURLWithParameters() { + String validUrl = "jdbc:geaflow://localhost:8080?user=test&password=test123"; + Assert.assertTrue(JDBCUtils.acceptsURL(validUrl)); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testRejectsInvalidURL() { + String invalidUrl = "jdbc:mysql://localhost:3306/test"; + JDBCUtils.acceptsURL(invalidUrl); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testRejectsEmptyURL() { + JDBCUtils.acceptsURL(""); + } + + @Test(expectedExceptions = GeaflowRuntimeException.class) + public void testRejectsNonJDBCURL() { + String nonJdbcUrl = "http://localhost:8080"; + JDBCUtils.acceptsURL(nonJdbcUrl); + } + + @Test + public void testDriverURLStartConstant() { + Assert.assertEquals(JDBCUtils.DRIVER_URL_START, "jdbc:geaflow://"); + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java new file mode 100644 index 000000000..6fed8c37c --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.DEFAULT_SEARCH_SIZE; +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTPS_SCHEME; +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTP_SCHEME; +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SCHEMA_SUFFIX; +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COLON; +import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COMMA; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.common.types.TableSchema; +import org.apache.geaflow.dsl.connector.api.FetchData; +import org.apache.geaflow.dsl.connector.api.Offset; +import org.apache.geaflow.dsl.connector.api.Partition; +import org.apache.geaflow.dsl.connector.api.TableSource; +import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; +import org.apache.geaflow.dsl.connector.api.window.FetchWindow; +import org.apache.http.HttpHost; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchTableSource implements TableSource { + + private static final Gson GSON = new Gson(); + private static final Type MAP_TYPE = new TypeToken>(){}.getType(); + + private Logger logger = LoggerFactory.getLogger(ElasticsearchTableSource.class); + + private StructType schema; + private String hosts; + private String indexName; + private String username; + private String password; + private String scrollTimeout; + private int connectionTimeout; + private int socketTimeout; + + private RestHighLevelClient client; + + @Override + public void init(Configuration tableConf, TableSchema tableSchema) { + this.schema = tableSchema; + this.hosts = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + this.indexName = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + this.username = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, ""); + this.password = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, ""); + this.scrollTimeout = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT, + ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT); + this.connectionTimeout = tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT, + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT); + this.socketTimeout = tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT, + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT); + } + + @Override + public void open(RuntimeContext context) { + try { + this.client = createElasticsearchClient(); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to initialize Elasticsearch client", e); + } + } + + @Override + public List listPartitions() { + return Collections.singletonList(new ElasticsearchPartition(indexName)); + } + + @Override + public TableDeserializer getDeserializer(Configuration conf) { + return new TableDeserializer() { + @Override + public void init(Configuration configuration, StructType structType) { + // Initialization if needed + } + + @Override + public List deserialize(IN record) { + if (record instanceof SearchHit) { + SearchHit hit = (SearchHit) record; + Map source = hit.getSourceAsMap(); + if (source == null) { + source = GSON.fromJson(hit.getSourceAsString(), MAP_TYPE); + } + + // Convert map to Row based on schema + Object[] values = new Object[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + String fieldName = schema.getFields().get(i).getName(); + values[i] = source.get(fieldName); + } + Row row = ObjectRow.create(values); + return Collections.singletonList(row); + } + return Collections.emptyList(); + } + }; + } + + @Override + public FetchData fetch(Partition partition, Optional startOffset, + FetchWindow windowInfo) throws IOException { + try { + SearchRequest searchRequest = new SearchRequest(indexName); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(DEFAULT_SEARCH_SIZE); // Batch size + + searchRequest.source(searchSourceBuilder); + + // Use scroll for large dataset reading + Scroll scroll = new Scroll(TimeValue.parseTimeValue(scrollTimeout, "scroll_timeout")); + searchRequest.scroll(scroll); + + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + String scrollId = searchResponse.getScrollId(); + SearchHit[] searchHits = searchResponse.getHits().getHits(); + + List dataList = new ArrayList<>(); + for (SearchHit hit : searchHits) { + dataList.add((T) hit); + } + + // Clear scroll + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + + ElasticsearchOffset nextOffset = new ElasticsearchOffset(scrollId); + return (FetchData) FetchData.createStreamFetch(dataList, nextOffset, false); + } catch (Exception e) { + throw new IOException("Failed to fetch data from Elasticsearch", e); + } + } + + @Override + public void close() { + try { + if (client != null) { + client.close(); + } + } catch (IOException e) { + // Log error but don't throw exception in close method + logger.warn("Failed to close Elasticsearch client", e); + } + } + + private RestHighLevelClient createElasticsearchClient() { + try { + String[] hostArray = hosts.split(ES_SPLIT_COMMA); + HttpHost[] httpHosts = new HttpHost[hostArray.length]; + + for (int i = 0; i < hostArray.length; i++) { + String host = hostArray[i].trim(); + if (host.startsWith(ES_HTTP_SCHEME + ES_SCHEMA_SUFFIX)) { + host = host.substring(7); + } else if (host.startsWith(ES_HTTPS_SCHEME + ES_SCHEMA_SUFFIX)) { + host = host.substring(8); + } + + String[] parts = host.split(ES_SPLIT_COLON); + String hostname = parts[0]; + int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 9200; + httpHosts[i] = new HttpHost(hostname, port, ES_HTTP_SCHEME); + } + + RestClientBuilder builder = RestClient.builder(httpHosts); + + // Configure timeouts + builder.setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(connectionTimeout); + requestConfigBuilder.setSocketTimeout(socketTimeout); + return requestConfigBuilder; + }); + + return new RestHighLevelClient(builder); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to create Elasticsearch client", e); + } + } + + public static class ElasticsearchPartition implements Partition { + private final String indexName; + + public ElasticsearchPartition(String indexName) { + this.indexName = indexName; + } + + @Override + public String getName() { + return indexName; + } + } + + public static class ElasticsearchOffset implements Offset { + private final String scrollId; + private final long timestamp; + + public ElasticsearchOffset(String scrollId) { + this(scrollId, System.currentTimeMillis()); + } + + public ElasticsearchOffset(String scrollId, long timestamp) { + this.scrollId = scrollId; + this.timestamp = timestamp; + } + + public String getScrollId() { + return scrollId; + } + + @Override + public String humanReadable() { + return "ElasticsearchOffset{scrollId='" + scrollId + "', timestamp=" + timestamp + "}"; + } + + @Override + public long getOffset() { + return timestamp; + } + + @Override + public boolean isTimestamp() { + return true; + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java new file mode 100644 index 000000000..239bc0015 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.neo4j; + +import static org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL; +import static org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_LABEL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.type.IType; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Config; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; +import org.neo4j.driver.Session; +import org.neo4j.driver.SessionConfig; +import org.neo4j.driver.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Neo4jTableSink implements TableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jTableSink.class); + + private StructType schema; + private String uri; + private String username; + private String password; + private String database; + private int batchSize; + private String writeMode; + private String nodeLabel; + private String relationshipType; + private String nodeIdField; + private String relationshipSourceField; + private String relationshipTargetField; + private long maxConnectionLifetime; + private int maxConnectionPoolSize; + private long connectionAcquisitionTimeout; + + private Driver driver; + private Session session; + private Transaction transaction; + private List batch; + + @Override + public void init(Configuration tableConf, StructType schema) { + LOGGER.info("Init Neo4j sink with config: {}, \n schema: {}", tableConf, schema); + this.schema = schema; + + this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI); + this.username = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME); + this.password = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD); + this.database = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE); + this.batchSize = tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE); + this.writeMode = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE); + this.nodeLabel = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL); + this.relationshipType = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE); + this.nodeIdField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD); + this.relationshipSourceField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD); + this.relationshipTargetField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD); + this.maxConnectionLifetime = tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME); + this.maxConnectionPoolSize = tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE); + this.connectionAcquisitionTimeout = tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT); + + validateConfig(); + this.batch = new ArrayList<>(batchSize); + } + + private void validateConfig() { + if (uri == null || uri.isEmpty()) { + throw new GeaFlowDSLException("Neo4j URI must be specified"); + } + if (username == null || username.isEmpty()) { + throw new GeaFlowDSLException("Neo4j username must be specified"); + } + if (password == null || password.isEmpty()) { + throw new GeaFlowDSLException("Neo4j password must be specified"); + } + if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) { + if (nodeIdField == null || nodeIdField.isEmpty()) { + throw new GeaFlowDSLException("Node ID field must be specified for node write mode"); + } + } else if (DEFAULT_RELATIONSHIP_LABEL.equals(writeMode)) { + if (relationshipSourceField == null || relationshipSourceField.isEmpty() + || relationshipTargetField == null || relationshipTargetField.isEmpty()) { + throw new GeaFlowDSLException("Relationship source and target fields must be specified for relationship write mode"); + } + } else { + throw new GeaFlowDSLException("Invalid write mode: " + writeMode + ". Must be 'node' or 'relationship'"); + } + } + + @Override + public void open(RuntimeContext context) { + try { + Config config = Config.builder() + .withMaxConnectionLifetime(maxConnectionLifetime, TimeUnit.MILLISECONDS) + .withMaxConnectionPoolSize(maxConnectionPoolSize) + .withConnectionAcquisitionTimeout(connectionAcquisitionTimeout, TimeUnit.MILLISECONDS) + .build(); + + this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username, password), config); + + SessionConfig sessionConfig = SessionConfig.builder() + .withDatabase(database) + .build(); + + this.session = driver.session(sessionConfig); + this.transaction = session.beginTransaction(); + + LOGGER.info("Neo4j connection established successfully"); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to connect to Neo4j: " + e.getMessage(), e); + } + } + + @Override + public void write(Row row) throws IOException { + batch.add(row); + if (batch.size() >= batchSize) { + flush(); + } + } + + @Override + public void finish() throws IOException { + if (!batch.isEmpty()) { + flush(); + } + try { + if (transaction != null) { + transaction.commit(); + transaction.close(); + transaction = null; + } + } catch (Exception e) { + LOGGER.error("Failed to commit transaction", e); + try { + if (transaction != null) { + transaction.rollback(); + } + } catch (Exception ex) { + throw new GeaFlowDSLException("Failed to rollback transaction", ex); + } + throw new GeaFlowDSLException("Failed to finish writing to Neo4j", e); + } + } + + @Override + public void close() { + try { + if (transaction != null) { + transaction.close(); + transaction = null; + } + if (session != null) { + session.close(); + session = null; + } + if (driver != null) { + driver.close(); + driver = null; + } + LOGGER.info("Neo4j connection closed successfully"); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to close Neo4j connection", e); + } + } + + private void flush() { + if (batch.isEmpty()) { + return; + } + + try { + if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) { + writeNodes(); + } else { + writeRelationships(); + } + batch.clear(); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to flush batch to Neo4j", e); + } + } + + private void writeNodes() { + List fieldNames = schema.getFieldNames(); + IType[] types = schema.getTypes(); + + int nodeIdIndex = fieldNames.indexOf(nodeIdField); + if (nodeIdIndex == -1) { + throw new GeaFlowDSLException("Node ID field not found in schema: " + nodeIdField); + } + + for (Row row : batch) { + Map properties = new HashMap<>(); + for (int i = 0; i < fieldNames.size(); i++) { + if (i == nodeIdIndex) { + continue; // Skip ID field, it will be used as node ID + } + Object value = row.getField(i, types[i]); + if (value != null) { + properties.put(fieldNames.get(i), value); + } + } + + Object nodeId = row.getField(nodeIdIndex, types[nodeIdIndex]); + if (nodeId == null) { + throw new GeaFlowDSLException("Node ID cannot be null"); + } + + String cypher = String.format( + "MERGE (n:%s {id: $id}) SET n += $properties", + nodeLabel + ); + + Map parameters = new HashMap<>(); + parameters.put("id", nodeId); + parameters.put("properties", properties); + + transaction.run(cypher, parameters); + } + } + + private void writeRelationships() { + List fieldNames = schema.getFieldNames(); + IType[] types = schema.getTypes(); + + int sourceIndex = fieldNames.indexOf(relationshipSourceField); + int targetIndex = fieldNames.indexOf(relationshipTargetField); + + if (sourceIndex == -1) { + throw new GeaFlowDSLException("Relationship source field not found in schema: " + relationshipSourceField); + } + if (targetIndex == -1) { + throw new GeaFlowDSLException("Relationship target field not found in schema: " + relationshipTargetField); + } + + for (Row row : batch) { + Object sourceId = row.getField(sourceIndex, types[sourceIndex]); + Object targetId = row.getField(targetIndex, types[targetIndex]); + + if (sourceId == null || targetId == null) { + throw new GeaFlowDSLException("Relationship source and target IDs cannot be null"); + } + + Map properties = new HashMap<>(); + for (int i = 0; i < fieldNames.size(); i++) { + if (i == sourceIndex || i == targetIndex) { + continue; // Skip source and target fields + } + Object value = row.getField(i, types[i]); + if (value != null) { + properties.put(fieldNames.get(i), value); + } + } + + final String cypher = String.format( + "MATCH (a {id: $sourceId}), (b {id: $targetId}) " + + "MERGE (a)-[r:%s]->(b) SET r += $properties", + relationshipType + ); + + Map parameters = new HashMap<>(); + parameters.put("sourceId", sourceId); + parameters.put("targetId", targetId); + parameters.put("properties", properties); + + transaction.run(cypher, parameters); + } + } +} From d7d9c857df113d70dc8be3a2bbd142bb9c44cd3d Mon Sep 17 00:00:00 2001 From: "jishiwen.jsw" Date: Sat, 15 Nov 2025 13:36:12 +0800 Subject: [PATCH 2/2] fix Signed-off-by: jishiwen.jsw --- .../ElasticsearchTableSource.java | 270 ---------------- .../dsl/connector/neo4j/Neo4jTableSink.java | 301 ------------------ 2 files changed, 571 deletions(-) delete mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java delete mode 100644 geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java deleted file mode 100644 index 6fed8c37c..000000000 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticsearchTableSource.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.geaflow.dsl.connector.elasticsearch; - -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.DEFAULT_SEARCH_SIZE; -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTPS_SCHEME; -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_HTTP_SCHEME; -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SCHEMA_SUFFIX; -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COLON; -import static org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchConstants.ES_SPLIT_COMMA; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.apache.geaflow.api.context.RuntimeContext; -import org.apache.geaflow.common.config.Configuration; -import org.apache.geaflow.dsl.common.data.Row; -import org.apache.geaflow.dsl.common.data.impl.ObjectRow; -import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; -import org.apache.geaflow.dsl.common.types.StructType; -import org.apache.geaflow.dsl.common.types.TableSchema; -import org.apache.geaflow.dsl.connector.api.FetchData; -import org.apache.geaflow.dsl.connector.api.Offset; -import org.apache.geaflow.dsl.connector.api.Partition; -import org.apache.geaflow.dsl.connector.api.TableSource; -import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; -import org.apache.geaflow.dsl.connector.api.window.FetchWindow; -import org.apache.http.HttpHost; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.search.Scroll; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ElasticsearchTableSource implements TableSource { - - private static final Gson GSON = new Gson(); - private static final Type MAP_TYPE = new TypeToken>(){}.getType(); - - private Logger logger = LoggerFactory.getLogger(ElasticsearchTableSource.class); - - private StructType schema; - private String hosts; - private String indexName; - private String username; - private String password; - private String scrollTimeout; - private int connectionTimeout; - private int socketTimeout; - - private RestHighLevelClient client; - - @Override - public void init(Configuration tableConf, TableSchema tableSchema) { - this.schema = tableSchema; - this.hosts = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); - this.indexName = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); - this.username = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, ""); - this.password = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, ""); - this.scrollTimeout = tableConf.getString(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SCROLL_TIMEOUT, - ElasticsearchConstants.DEFAULT_SCROLL_TIMEOUT); - this.connectionTimeout = tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_CONNECTION_TIMEOUT, - ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT); - this.socketTimeout = tableConf.getInteger(ElasticsearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_SOCKET_TIMEOUT, - ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT); - } - - @Override - public void open(RuntimeContext context) { - try { - this.client = createElasticsearchClient(); - } catch (Exception e) { - throw new GeaFlowDSLException("Failed to initialize Elasticsearch client", e); - } - } - - @Override - public List listPartitions() { - return Collections.singletonList(new ElasticsearchPartition(indexName)); - } - - @Override - public TableDeserializer getDeserializer(Configuration conf) { - return new TableDeserializer() { - @Override - public void init(Configuration configuration, StructType structType) { - // Initialization if needed - } - - @Override - public List deserialize(IN record) { - if (record instanceof SearchHit) { - SearchHit hit = (SearchHit) record; - Map source = hit.getSourceAsMap(); - if (source == null) { - source = GSON.fromJson(hit.getSourceAsString(), MAP_TYPE); - } - - // Convert map to Row based on schema - Object[] values = new Object[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - String fieldName = schema.getFields().get(i).getName(); - values[i] = source.get(fieldName); - } - Row row = ObjectRow.create(values); - return Collections.singletonList(row); - } - return Collections.emptyList(); - } - }; - } - - @Override - public FetchData fetch(Partition partition, Optional startOffset, - FetchWindow windowInfo) throws IOException { - try { - SearchRequest searchRequest = new SearchRequest(indexName); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.size(DEFAULT_SEARCH_SIZE); // Batch size - - searchRequest.source(searchSourceBuilder); - - // Use scroll for large dataset reading - Scroll scroll = new Scroll(TimeValue.parseTimeValue(scrollTimeout, "scroll_timeout")); - searchRequest.scroll(scroll); - - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - String scrollId = searchResponse.getScrollId(); - SearchHit[] searchHits = searchResponse.getHits().getHits(); - - List dataList = new ArrayList<>(); - for (SearchHit hit : searchHits) { - dataList.add((T) hit); - } - - // Clear scroll - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - - ElasticsearchOffset nextOffset = new ElasticsearchOffset(scrollId); - return (FetchData) FetchData.createStreamFetch(dataList, nextOffset, false); - } catch (Exception e) { - throw new IOException("Failed to fetch data from Elasticsearch", e); - } - } - - @Override - public void close() { - try { - if (client != null) { - client.close(); - } - } catch (IOException e) { - // Log error but don't throw exception in close method - logger.warn("Failed to close Elasticsearch client", e); - } - } - - private RestHighLevelClient createElasticsearchClient() { - try { - String[] hostArray = hosts.split(ES_SPLIT_COMMA); - HttpHost[] httpHosts = new HttpHost[hostArray.length]; - - for (int i = 0; i < hostArray.length; i++) { - String host = hostArray[i].trim(); - if (host.startsWith(ES_HTTP_SCHEME + ES_SCHEMA_SUFFIX)) { - host = host.substring(7); - } else if (host.startsWith(ES_HTTPS_SCHEME + ES_SCHEMA_SUFFIX)) { - host = host.substring(8); - } - - String[] parts = host.split(ES_SPLIT_COLON); - String hostname = parts[0]; - int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 9200; - httpHosts[i] = new HttpHost(hostname, port, ES_HTTP_SCHEME); - } - - RestClientBuilder builder = RestClient.builder(httpHosts); - - // Configure timeouts - builder.setRequestConfigCallback(requestConfigBuilder -> { - requestConfigBuilder.setConnectTimeout(connectionTimeout); - requestConfigBuilder.setSocketTimeout(socketTimeout); - return requestConfigBuilder; - }); - - return new RestHighLevelClient(builder); - } catch (Exception e) { - throw new GeaFlowDSLException("Failed to create Elasticsearch client", e); - } - } - - public static class ElasticsearchPartition implements Partition { - private final String indexName; - - public ElasticsearchPartition(String indexName) { - this.indexName = indexName; - } - - @Override - public String getName() { - return indexName; - } - } - - public static class ElasticsearchOffset implements Offset { - private final String scrollId; - private final long timestamp; - - public ElasticsearchOffset(String scrollId) { - this(scrollId, System.currentTimeMillis()); - } - - public ElasticsearchOffset(String scrollId, long timestamp) { - this.scrollId = scrollId; - this.timestamp = timestamp; - } - - public String getScrollId() { - return scrollId; - } - - @Override - public String humanReadable() { - return "ElasticsearchOffset{scrollId='" + scrollId + "', timestamp=" + timestamp + "}"; - } - - @Override - public long getOffset() { - return timestamp; - } - - @Override - public boolean isTimestamp() { - return true; - } - } -} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java deleted file mode 100644 index 239bc0015..000000000 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-neo4j/src/main/java/org/apache/geaflow/dsl/connector/neo4j/Neo4jTableSink.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.geaflow.dsl.connector.neo4j; - -import static org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_NODE_LABEL; -import static org.apache.geaflow.dsl.connector.neo4j.Neo4jConstants.DEFAULT_RELATIONSHIP_LABEL; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.geaflow.api.context.RuntimeContext; -import org.apache.geaflow.common.config.Configuration; -import org.apache.geaflow.common.type.IType; -import org.apache.geaflow.dsl.common.data.Row; -import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; -import org.apache.geaflow.dsl.common.types.StructType; -import org.apache.geaflow.dsl.connector.api.TableSink; -import org.neo4j.driver.AuthTokens; -import org.neo4j.driver.Config; -import org.neo4j.driver.Driver; -import org.neo4j.driver.GraphDatabase; -import org.neo4j.driver.Session; -import org.neo4j.driver.SessionConfig; -import org.neo4j.driver.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Neo4jTableSink implements TableSink { - - private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jTableSink.class); - - private StructType schema; - private String uri; - private String username; - private String password; - private String database; - private int batchSize; - private String writeMode; - private String nodeLabel; - private String relationshipType; - private String nodeIdField; - private String relationshipSourceField; - private String relationshipTargetField; - private long maxConnectionLifetime; - private int maxConnectionPoolSize; - private long connectionAcquisitionTimeout; - - private Driver driver; - private Session session; - private Transaction transaction; - private List batch; - - @Override - public void init(Configuration tableConf, StructType schema) { - LOGGER.info("Init Neo4j sink with config: {}, \n schema: {}", tableConf, schema); - this.schema = schema; - - this.uri = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_URI); - this.username = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_USERNAME); - this.password = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_PASSWORD); - this.database = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_DATABASE); - this.batchSize = tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_BATCH_SIZE); - this.writeMode = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_WRITE_MODE); - this.nodeLabel = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_LABEL); - this.relationshipType = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TYPE); - this.nodeIdField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_NODE_ID_FIELD); - this.relationshipSourceField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_SOURCE_FIELD); - this.relationshipTargetField = tableConf.getString(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_RELATIONSHIP_TARGET_FIELD); - this.maxConnectionLifetime = tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_LIFETIME); - this.maxConnectionPoolSize = tableConf.getInteger(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_MAX_CONNECTION_POOL_SIZE); - this.connectionAcquisitionTimeout = tableConf.getLong(Neo4jConfigKeys.GEAFLOW_DSL_NEO4J_CONNECTION_ACQUISITION_TIMEOUT); - - validateConfig(); - this.batch = new ArrayList<>(batchSize); - } - - private void validateConfig() { - if (uri == null || uri.isEmpty()) { - throw new GeaFlowDSLException("Neo4j URI must be specified"); - } - if (username == null || username.isEmpty()) { - throw new GeaFlowDSLException("Neo4j username must be specified"); - } - if (password == null || password.isEmpty()) { - throw new GeaFlowDSLException("Neo4j password must be specified"); - } - if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) { - if (nodeIdField == null || nodeIdField.isEmpty()) { - throw new GeaFlowDSLException("Node ID field must be specified for node write mode"); - } - } else if (DEFAULT_RELATIONSHIP_LABEL.equals(writeMode)) { - if (relationshipSourceField == null || relationshipSourceField.isEmpty() - || relationshipTargetField == null || relationshipTargetField.isEmpty()) { - throw new GeaFlowDSLException("Relationship source and target fields must be specified for relationship write mode"); - } - } else { - throw new GeaFlowDSLException("Invalid write mode: " + writeMode + ". Must be 'node' or 'relationship'"); - } - } - - @Override - public void open(RuntimeContext context) { - try { - Config config = Config.builder() - .withMaxConnectionLifetime(maxConnectionLifetime, TimeUnit.MILLISECONDS) - .withMaxConnectionPoolSize(maxConnectionPoolSize) - .withConnectionAcquisitionTimeout(connectionAcquisitionTimeout, TimeUnit.MILLISECONDS) - .build(); - - this.driver = GraphDatabase.driver(uri, AuthTokens.basic(username, password), config); - - SessionConfig sessionConfig = SessionConfig.builder() - .withDatabase(database) - .build(); - - this.session = driver.session(sessionConfig); - this.transaction = session.beginTransaction(); - - LOGGER.info("Neo4j connection established successfully"); - } catch (Exception e) { - throw new GeaFlowDSLException("Failed to connect to Neo4j: " + e.getMessage(), e); - } - } - - @Override - public void write(Row row) throws IOException { - batch.add(row); - if (batch.size() >= batchSize) { - flush(); - } - } - - @Override - public void finish() throws IOException { - if (!batch.isEmpty()) { - flush(); - } - try { - if (transaction != null) { - transaction.commit(); - transaction.close(); - transaction = null; - } - } catch (Exception e) { - LOGGER.error("Failed to commit transaction", e); - try { - if (transaction != null) { - transaction.rollback(); - } - } catch (Exception ex) { - throw new GeaFlowDSLException("Failed to rollback transaction", ex); - } - throw new GeaFlowDSLException("Failed to finish writing to Neo4j", e); - } - } - - @Override - public void close() { - try { - if (transaction != null) { - transaction.close(); - transaction = null; - } - if (session != null) { - session.close(); - session = null; - } - if (driver != null) { - driver.close(); - driver = null; - } - LOGGER.info("Neo4j connection closed successfully"); - } catch (Exception e) { - throw new GeaFlowDSLException("Failed to close Neo4j connection", e); - } - } - - private void flush() { - if (batch.isEmpty()) { - return; - } - - try { - if (DEFAULT_NODE_LABEL.toLowerCase().equals(writeMode)) { - writeNodes(); - } else { - writeRelationships(); - } - batch.clear(); - } catch (Exception e) { - throw new GeaFlowDSLException("Failed to flush batch to Neo4j", e); - } - } - - private void writeNodes() { - List fieldNames = schema.getFieldNames(); - IType[] types = schema.getTypes(); - - int nodeIdIndex = fieldNames.indexOf(nodeIdField); - if (nodeIdIndex == -1) { - throw new GeaFlowDSLException("Node ID field not found in schema: " + nodeIdField); - } - - for (Row row : batch) { - Map properties = new HashMap<>(); - for (int i = 0; i < fieldNames.size(); i++) { - if (i == nodeIdIndex) { - continue; // Skip ID field, it will be used as node ID - } - Object value = row.getField(i, types[i]); - if (value != null) { - properties.put(fieldNames.get(i), value); - } - } - - Object nodeId = row.getField(nodeIdIndex, types[nodeIdIndex]); - if (nodeId == null) { - throw new GeaFlowDSLException("Node ID cannot be null"); - } - - String cypher = String.format( - "MERGE (n:%s {id: $id}) SET n += $properties", - nodeLabel - ); - - Map parameters = new HashMap<>(); - parameters.put("id", nodeId); - parameters.put("properties", properties); - - transaction.run(cypher, parameters); - } - } - - private void writeRelationships() { - List fieldNames = schema.getFieldNames(); - IType[] types = schema.getTypes(); - - int sourceIndex = fieldNames.indexOf(relationshipSourceField); - int targetIndex = fieldNames.indexOf(relationshipTargetField); - - if (sourceIndex == -1) { - throw new GeaFlowDSLException("Relationship source field not found in schema: " + relationshipSourceField); - } - if (targetIndex == -1) { - throw new GeaFlowDSLException("Relationship target field not found in schema: " + relationshipTargetField); - } - - for (Row row : batch) { - Object sourceId = row.getField(sourceIndex, types[sourceIndex]); - Object targetId = row.getField(targetIndex, types[targetIndex]); - - if (sourceId == null || targetId == null) { - throw new GeaFlowDSLException("Relationship source and target IDs cannot be null"); - } - - Map properties = new HashMap<>(); - for (int i = 0; i < fieldNames.size(); i++) { - if (i == sourceIndex || i == targetIndex) { - continue; // Skip source and target fields - } - Object value = row.getField(i, types[i]); - if (value != null) { - properties.put(fieldNames.get(i), value); - } - } - - final String cypher = String.format( - "MATCH (a {id: $sourceId}), (b {id: $targetId}) " - + "MERGE (a)-[r:%s]->(b) SET r += $properties", - relationshipType - ); - - Map parameters = new HashMap<>(); - parameters.put("sourceId", sourceId); - parameters.put("targetId", targetId); - parameters.put("properties", properties); - - transaction.run(cypher, parameters); - } - } -}