diff --git a/interactive_engine/common/src/main/java/com/alibaba/maxgraph/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/maxgraph/common/config/StoreConfig.java index d2fdddb9f8a3..a762a58636da 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/maxgraph/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/maxgraph/common/config/StoreConfig.java @@ -40,9 +40,4 @@ public class StoreConfig { public static final Config EXECUTOR_ENGINE_PORT = Config.intConfig("executor.engine.port", 0); - - public static final Config OSS_ACCESS_ID = Config.stringConfig("oss.access.id", ""); - - public static final Config OSS_ACCESS_SECRET = - Config.stringConfig("oss.access.secret", ""); } diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/DataCommand.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/DataCommand.java index 21f298a3dfc7..a5fde4756ccf 100644 --- a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/DataCommand.java +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/DataCommand.java @@ -2,6 +2,7 @@ import com.alibaba.maxgraph.compiler.api.schema.GraphSchema; import com.alibaba.maxgraph.dataload.databuild.ColumnMappingInfo; +import com.alibaba.maxgraph.dataload.util.HttpClient; import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -14,6 +15,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.nio.file.Paths; import java.util.*; @@ -26,6 +28,9 @@ public abstract class DataCommand { protected String metaData; protected String username; protected String password; + + protected String ossAccessID; + protected String ossAccessKey; protected String uniquePath; protected final String metaFileName = "META"; @@ -34,6 +39,7 @@ public abstract class DataCommand { protected final String OSS_ACCESS_KEY = "oss.access.key"; protected final String OSS_BUCKET_NAME = "oss.bucket.name"; protected final String OSS_OBJECT_NAME = "oss.object.name"; + protected final String OSS_INFO_URL = "oss.info.url"; protected final String USER_NAME = "auth.username"; protected final String PASS_WORD = "auth.password"; @@ -43,6 +49,24 @@ public DataCommand(String configPath, boolean isFromOSS, String uniquePath) thro initialize(isFromOSS); } + private HashMap getOSSInfoFromURL(String URL) throws IOException { + HttpClient client = new HttpClient(); + HttpURLConnection conn = null; + try { + conn = client.createConnection(URL); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + ObjectMapper mapper = new ObjectMapper(); + TypeReference> typeRef = + new TypeReference>() {}; + return mapper.readValue(conn.getInputStream(), typeRef); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + private void initialize(boolean isFromOSS) throws IOException { if (isFromOSS) { Properties properties = new Properties(); @@ -51,23 +75,33 @@ private void initialize(boolean isFromOSS) throws IOException { } catch (IOException e) { throw e; } - String ossEndPoint = properties.getProperty(OSS_ENDPOINT); - String ossAccessId = properties.getProperty(OSS_ACCESS_ID); - String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY); + this.ossAccessID = properties.getProperty(OSS_ACCESS_ID); + this.ossAccessKey = properties.getProperty(OSS_ACCESS_KEY); + String ossEndpoint = properties.getProperty(OSS_ENDPOINT); String ossBucketName = properties.getProperty(OSS_BUCKET_NAME); String ossObjectName = properties.getProperty(OSS_OBJECT_NAME); + if (this.ossAccessID == null || this.ossAccessID.isEmpty()) { + String URL = properties.getProperty(OSS_INFO_URL); + HashMap o = getOSSInfoFromURL(URL); + this.ossAccessID = o.get("ossAccessID"); + this.ossAccessKey = o.get("ossAccessKey"); + ossEndpoint = o.get("ossEndpoint"); + ossBucketName = o.get("ossBucketName"); + ossObjectName = o.get("ossObjectName"); + } + username = properties.getProperty(USER_NAME); password = properties.getProperty(PASS_WORD); configPath = "oss://" + Paths.get( - Paths.get(ossEndPoint, ossBucketName).toString(), + Paths.get(ossEndpoint, ossBucketName).toString(), ossObjectName); Map ossInfo = new HashMap<>(); - ossInfo.put(OSS_ENDPOINT, ossEndPoint); - ossInfo.put(OSS_ACCESS_ID, ossAccessId); + ossInfo.put(OSS_ENDPOINT, ossEndpoint); + ossInfo.put(OSS_ACCESS_ID, ossAccessID); ossInfo.put(OSS_ACCESS_KEY, ossAccessKey); OSSFileObj ossFileObj = new OSSFileObj(ossInfo); ossObjectName = Paths.get(ossObjectName, uniquePath).toString(); diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/IngestDataCommand.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/IngestDataCommand.java index c120b23ccb23..9cc9a08f67b9 100644 --- a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/IngestDataCommand.java +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/IngestDataCommand.java @@ -2,9 +2,14 @@ import com.alibaba.graphscope.groot.sdk.MaxGraphClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.util.HashMap; public class IngestDataCommand extends DataCommand { + private static final Logger logger = LoggerFactory.getLogger(IngestDataCommand.class); public IngestDataCommand(String dataPath, boolean isFromOSS, String uniquePath) throws IOException { @@ -18,8 +23,15 @@ public void run() { .setUsername(username) .setPassword(password) .build(); - // dataPath = Paths.get(dataPath, uniquePath).toString(); configPath = configPath + "/" + uniquePath; - client.ingestData(configPath); + if (ossAccessID == null || ossAccessKey == null) { + logger.warn("ossAccessID or ossAccessKey is null, using default configuration."); + client.ingestData(configPath); + } else { + HashMap config = new HashMap<>(); + config.put("ossAccessID", ossAccessID); + config.put("ossAccessKey", ossAccessKey); + client.ingestData(configPath, config); + } } } diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/OSSFileObj.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/OSSFileObj.java index d9dfd43b88a1..cf6c1c0e8439 100644 --- a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/OSSFileObj.java +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/OSSFileObj.java @@ -21,23 +21,23 @@ public class OSSFileObj { protected final String OSS_ACCESS_ID = "oss.access.id"; protected final String OSS_ACCESS_KEY = "oss.access.key"; - protected String ossEndPoint = null; - protected String ossAccessId = null; + protected String ossEndpoint = null; + protected String ossAccessID = null; protected String ossAccessKey = null; protected OSS ossClient = null; public OSSFileObj(Map ossInfo) throws IOException { - this.ossEndPoint = ossInfo.get(OSS_ENDPOINT); - this.ossAccessId = ossInfo.get(OSS_ACCESS_ID); + this.ossEndpoint = ossInfo.get(OSS_ENDPOINT); + this.ossAccessID = ossInfo.get(OSS_ACCESS_ID); this.ossAccessKey = ossInfo.get(OSS_ACCESS_KEY); - if (!ossEndPoint.startsWith("http")) { - ossEndPoint = "https://" + ossEndPoint; + if (!ossEndpoint.startsWith("http")) { + ossEndpoint = "https://" + ossEndpoint; } try { - this.ossClient = new OSSClientBuilder().build(ossEndPoint, ossAccessId, ossAccessKey); + this.ossClient = new OSSClientBuilder().build(ossEndpoint, ossAccessID, ossAccessKey); } catch (OSSException oe) { throw new IOException(oe); } catch (ClientException ce) { diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/DataBuildReducerOdps.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/DataBuildReducerOdps.java index bc33e19a6a2c..b5b9b0d43719 100644 --- a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/DataBuildReducerOdps.java +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/DataBuildReducerOdps.java @@ -33,9 +33,9 @@ import java.util.Map; public class DataBuildReducerOdps extends ReducerBase { - private String ossAccessId = null; + private String ossAccessID = null; private String ossAccessKey = null; - private String ossEndPoint = null; + private String ossEndpoint = null; private String ossBucketName = null; private String ossObjectName = null; @@ -51,16 +51,16 @@ public class DataBuildReducerOdps extends ReducerBase { @Override public void setup(TaskContext context) throws IOException { - this.ossAccessId = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID); + this.ossAccessID = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID); this.ossAccessKey = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_KEY); - this.ossEndPoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT); + this.ossEndpoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT); this.ossBucketName = context.getJobConf().get(OfflineBuildOdps.OSS_BUCKET_NAME); this.ossObjectName = context.getJobConf().get(OfflineBuildOdps.OSS_OBJECT_NAME); this.metaData = context.getJobConf().get(OfflineBuildOdps.META_INFO); this.uniquePath = context.getJobConf().get(OfflineBuildOdps.UNIQUE_PATH); - if (!ossEndPoint.startsWith("http")) { - ossEndPoint = "https://" + ossEndPoint; + if (!ossEndpoint.startsWith("http")) { + ossEndpoint = "https://" + ossEndpoint; } this.taskId = context.getTaskID().toString(); @@ -69,8 +69,8 @@ public void setup(TaskContext context) throws IOException { chkFileName = "part-r-" + taskId + ".chk"; Map ossInfo = new HashMap(); - ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndPoint); - ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessId); + ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndpoint); + ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessID); ossInfo.put(OfflineBuildOdps.OSS_ACCESS_KEY, ossAccessKey); this.ossFileObj = new OSSFileObj(ossInfo); diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/OfflineBuildOdps.java index 29b883c5bfb3..cb5f9f53c54f 100644 --- a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/databuild/OfflineBuildOdps.java @@ -18,6 +18,7 @@ import com.alibaba.maxgraph.compiler.api.schema.GraphEdge; import com.alibaba.maxgraph.compiler.api.schema.GraphElement; import com.alibaba.maxgraph.compiler.api.schema.GraphSchema; +import com.alibaba.maxgraph.dataload.util.HttpClient; import com.alibaba.maxgraph.sdkcommon.common.DataLoadTarget; import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper; import com.alibaba.maxgraph.sdkcommon.util.UuidUtils; @@ -36,6 +37,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.nio.file.Paths; import java.util.*; @@ -57,13 +59,32 @@ public class OfflineBuildOdps { public static final String OSS_ENDPOINT = "oss.endpoint"; public static final String OSS_BUCKET_NAME = "oss.bucket.name"; public static final String OSS_OBJECT_NAME = "oss.object.name"; + public static final String OSS_INFO_URL = "oss.info.url"; + public static final String META_INFO = "meta.info"; public static final String USER_NAME = "auth.username"; public static final String PASS_WORD = "auth.password"; public static final String UNIQUE_PATH = "unique.path"; - public static void main(String[] args) - throws IOException, ClassNotFoundException, InterruptedException { + private static HashMap getOSSInfoFromURL(String URL) throws IOException { + HttpClient client = new HttpClient(); + HttpURLConnection conn = null; + try { + conn = client.createConnection(URL); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + ObjectMapper mapper = new ObjectMapper(); + TypeReference> typeRef = + new TypeReference>() {}; + return mapper.readValue(conn.getInputStream(), typeRef); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + public static void main(String[] args) throws IOException { String propertiesFile = args[0]; String uniquePath = UuidUtils.getBase64UUIDString(); // User can assign a unique path manually. @@ -77,12 +98,22 @@ public static void main(String[] args) } String outputTable = properties.getProperty(OUTPUT_TABLE); - String ossAccessId = properties.getProperty(OSS_ACCESS_ID); + String ossAccessID = properties.getProperty(OSS_ACCESS_ID); String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY); - String ossEndPoint = properties.getProperty(OSS_ENDPOINT); + String ossEndpoint = properties.getProperty(OSS_ENDPOINT); String ossBucketName = properties.getProperty(OSS_BUCKET_NAME); String ossObjectName = properties.getProperty(OSS_OBJECT_NAME); + if (ossAccessID == null || ossAccessID.isEmpty()) { + String URL = properties.getProperty(OSS_INFO_URL); + HashMap o = getOSSInfoFromURL(URL); + ossAccessID = o.get("ossAccessID"); + ossAccessKey = o.get("ossAccessKey"); + ossEndpoint = o.get("ossEndpoint"); + ossBucketName = o.get("ossBucketName"); + ossObjectName = o.get("ossObjectName"); + } + // The table format is `project.table` or `table`; // For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2` String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG); @@ -137,9 +168,9 @@ public static void main(String[] args) job.setBoolean(SKIP_HEADER, skipHeader); job.set(GRAPH_ENDPOINT, graphEndpoint); - job.set(OSS_ACCESS_ID, ossAccessId); + job.set(OSS_ACCESS_ID, ossAccessID); job.set(OSS_ACCESS_KEY, ossAccessKey); - job.set(OSS_ENDPOINT, ossEndPoint); + job.set(OSS_ENDPOINT, ossEndpoint); job.set(OSS_BUCKET_NAME, ossBucketName); job.set(OSS_OBJECT_NAME, ossObjectName); diff --git a/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/util/HttpClient.java b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/util/HttpClient.java new file mode 100644 index 000000000000..3a0f0ff7883b --- /dev/null +++ b/interactive_engine/data_load_tools/src/main/java/com/alibaba/maxgraph/dataload/util/HttpClient.java @@ -0,0 +1,77 @@ +package com.alibaba.maxgraph.dataload.util; + +import org.apache.commons.codec.binary.Base64; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.HttpURLConnection; +import java.net.PasswordAuthentication; +import java.net.URL; +import java.nio.charset.StandardCharsets; + +/** + * Simple HTTPClient which refer from ... + */ +public class HttpClient { + + private final String user; + private final String password; + + public HttpClient() { + this("", ""); + } + + public HttpClient(String user, String password) { + this.user = user; + this.password = password; + } + + public int sendRequestWithAuthHeader(String url) throws IOException { + HttpURLConnection connection = null; + try { + connection = createConnection(url); + connection.setRequestProperty("Authorization", createBasicAuthHeaderValue()); + return connection.getResponseCode(); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + public int sendRequestWithAuthenticator(String url) throws IOException { + setAuthenticator(); + HttpURLConnection connection = null; + try { + connection = createConnection(url); + return connection.getResponseCode(); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + public HttpURLConnection createConnection(String urlString) throws IOException { + URL url = new URL(urlString); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + return connection; + } + + private String createBasicAuthHeaderValue() { + String auth = user + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encodedAuth); + } + + private void setAuthenticator() { + Authenticator.setDefault(new BasicAuthenticator()); + } + + private final class BasicAuthenticator extends Authenticator { + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(user, password.toCharArray()); + } + } +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index e8eef7a8dbc8..8fb4aa12c714 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -331,6 +331,7 @@ public void getLoggerInfo( public void ingestData( IngestDataRequest request, StreamObserver responseObserver) { String dataPath = request.getDataPath(); + Map config = request.getConfigMap(); logger.info("ingestData. path [" + dataPath + "]"); int storeCount = this.metaService.getStoreCount(); AtomicInteger counter = new AtomicInteger(storeCount); @@ -339,6 +340,7 @@ public void ingestData( this.storeIngestor.ingest( i, dataPath, + config, new CompletionCallback() { @Override public void onCompleted(Void res) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java index bd92fae45752..5041dea47707 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClient.java @@ -24,6 +24,8 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; +import java.util.Map; + public class StoreIngestClient extends RpcClient { private StoreIngestGrpc.StoreIngestStub stub; @@ -33,10 +35,13 @@ public StoreIngestClient(ManagedChannel channel) { this.stub = StoreIngestGrpc.newStub(channel); } - public void storeIngest(String dataPath, CompletionCallback callback) { - StoreIngestRequest req = StoreIngestRequest.newBuilder().setDataPath(dataPath).build(); + public void storeIngest( + String dataPath, Map config, CompletionCallback callback) { + StoreIngestRequest.Builder builder = StoreIngestRequest.newBuilder(); + builder.setDataPath(dataPath); + builder.putAllConfig(config); this.stub.storeIngest( - req, + builder.build(), new StreamObserver() { @Override public void onNext(StoreIngestResponse value) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java index 5cb913ff8bd4..c2698e429cb1 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestClients.java @@ -20,6 +20,8 @@ import io.grpc.ManagedChannel; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; public class StoreIngestClients extends RoleClients implements StoreIngestor { @@ -33,7 +35,16 @@ public StoreIngestClients( @Override public void ingest(int storeId, String path, CompletionCallback callback) { - this.getClient(storeId).storeIngest(path, callback); + this.ingest(storeId, path, new HashMap(), callback); + } + + @Override + public void ingest( + int storeId, + String path, + Map config, + CompletionCallback callback) { + this.getClient(storeId).storeIngest(path, config, callback); } @Override diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java index 63c8253c7b6f..8a99e8dd6b9d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/StoreIngestor.java @@ -15,8 +15,16 @@ import com.alibaba.graphscope.groot.CompletionCallback; +import java.util.Map; + public interface StoreIngestor { void ingest(int storeId, String path, CompletionCallback callback); + void ingest( + int storeId, + String path, + Map config, + CompletionCallback callback); + void clearIngest(int storeId, CompletionCallback callback); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java index b94f4d94abd8..47397b28a6ea 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java @@ -20,6 +20,7 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.Map; public class StoreIngestService extends StoreIngestGrpc.StoreIngestImplBase { @@ -33,8 +34,10 @@ public StoreIngestService(StoreService storeService) { public void storeIngest( StoreIngestRequest request, StreamObserver responseObserver) { String dataPath = request.getDataPath(); + Map config = request.getConfigMap(); this.storeService.ingestData( dataPath, + config, new CompletionCallback() { @Override public void onCompleted(Void res) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 41222f295b9f..d1b580ae634c 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -60,7 +60,7 @@ public class StoreService implements MetricsAgent { private static final String PARTITION_WRITE_PER_SECOND_MS = "partition.write.per.second.ms"; - private Configs configs; + private Configs storeConfigs; private int storeId; private int writeThreadCount; private int downloadThreadCount; @@ -77,11 +77,11 @@ public class StoreService implements MetricsAgent { private Map partitionToMetric; public StoreService( - Configs configs, MetaService metaService, MetricsCollector metricsCollector) { - this.configs = configs; - this.storeId = CommonConfig.NODE_IDX.get(configs); - this.enableGc = StoreConfig.STORE_GC_ENABLE.get(configs); - this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(configs); + Configs storeConfigs, MetaService metaService, MetricsCollector metricsCollector) { + this.storeConfigs = storeConfigs; + this.storeId = CommonConfig.NODE_IDX.get(storeConfigs); + this.enableGc = StoreConfig.STORE_GC_ENABLE.get(storeConfigs); + this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(storeConfigs); this.metaService = metaService; metricsCollector.register(this, () -> updateMetrics()); } @@ -92,7 +92,7 @@ public void start() throws IOException { this.idToPartition = new HashMap<>(partitionIds.size()); for (int partitionId : partitionIds) { try { - GraphPartition partition = makeGraphPartition(this.configs, partitionId); + GraphPartition partition = makeGraphPartition(this.storeConfigs, partitionId); this.idToPartition.put(partitionId, partition); } catch (IOException e) { throw new MaxGraphException(e); @@ -300,8 +300,9 @@ public MetaService getMetaService() { return this.metaService; } - public void ingestData(String path, CompletionCallback callback) { - String dataRoot = StoreConfig.STORE_DATA_PATH.get(configs); + public void ingestData( + String path, Map config, CompletionCallback callback) { + String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); String downloadPath = Paths.get(dataRoot, "download").toString(); String[] items = path.split("\\/"); // Get the unique path (uuid) @@ -320,7 +321,7 @@ public void ingestData(String path, CompletionCallback callback) { () -> { try { logger.info("ingesting data [{}]", path); - ingestDataInternal(path, callback); + ingestDataInternal(path, config, callback); } catch (Exception e) { logger.error("ingest data failed. path [" + path + "]", e); callback.onError(e); @@ -329,9 +330,10 @@ public void ingestData(String path, CompletionCallback callback) { }); } - private void ingestDataInternal(String path, CompletionCallback callback) + private void ingestDataInternal( + String path, Map config, CompletionCallback callback) throws IOException { - ExternalStorage externalStorage = ExternalStorage.getStorage(configs, path); + ExternalStorage externalStorage = ExternalStorage.getStorage(path, config); Set> entries = this.idToPartition.entrySet(); AtomicInteger counter = new AtomicInteger(entries.size()); AtomicBoolean finished = new AtomicBoolean(false); @@ -359,7 +361,7 @@ private void ingestDataInternal(String path, CompletionCallback callback) } public void clearIngest() throws IOException { - String dataRoot = StoreConfig.STORE_DATA_PATH.get(configs); + String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); Path downloadPath = Paths.get(dataRoot, "download"); try { logger.info("Clearing directory {}", downloadPath); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java index 75a3b8caf978..c0cca9c1966d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java @@ -1,19 +1,19 @@ package com.alibaba.graphscope.groot.store.external; -import com.alibaba.maxgraph.common.config.Configs; - import java.io.IOException; import java.net.URI; +import java.util.Map; public abstract class ExternalStorage { - public static ExternalStorage getStorage(Configs configs, String path) throws IOException { + public static ExternalStorage getStorage(String path, Map config) + throws IOException { URI uri = URI.create(path); String scheme = uri.getScheme(); switch (scheme) { case "hdfs": return new HdfsStorage(path); case "oss": - return new OssStorage(configs, path); + return new OssStorage(path, config); default: throw new IllegalArgumentException( "external storage scheme [" + scheme + "] not supported"); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java index ebb85f9d4c02..c1b3c606f497 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java @@ -1,24 +1,23 @@ package com.alibaba.graphscope.groot.store.external; -import com.alibaba.maxgraph.common.config.Configs; -import com.alibaba.maxgraph.common.config.StoreConfig; import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.GetObjectRequest; import java.io.File; import java.net.URI; +import java.util.Map; public class OssStorage extends ExternalStorage { private OSS ossClient; - public OssStorage(Configs configs, String path) { + public OssStorage(String path, Map config) { URI uri = URI.create(path); - String accessId = StoreConfig.OSS_ACCESS_ID.get(configs); - String accessSecret = StoreConfig.OSS_ACCESS_SECRET.get(configs); String endpoint = uri.getAuthority(); - this.ossClient = new OSSClientBuilder().build(endpoint, accessId, accessSecret); + String accessID = config.get("ossAccessID"); + String accessKey = config.get("ossAccessKey"); + this.ossClient = new OSSClientBuilder().build(endpoint, accessID, accessKey); } @Override diff --git a/interactive_engine/pom.xml b/interactive_engine/pom.xml index cfab3f9a37b3..af31aa04f89c 100644 --- a/interactive_engine/pom.xml +++ b/interactive_engine/pom.xml @@ -262,12 +262,6 @@ ${jackson.version} - - com.fasterxml.jackson.module - jackson-module-scala_2.11 - ${jackson.version} - - com.fasterxml.jackson.core jackson-annotations diff --git a/interactive_engine/proto/sdk/client.proto b/interactive_engine/proto/sdk/client.proto index a6b21b451005..cc6fe834280d 100644 --- a/interactive_engine/proto/sdk/client.proto +++ b/interactive_engine/proto/sdk/client.proto @@ -87,6 +87,7 @@ message GetSchemaResponse { message IngestDataRequest { string dataPath = 1; + map config = 2; } message IngestDataResponse { diff --git a/interactive_engine/proto/store_ingest_service.proto b/interactive_engine/proto/store_ingest_service.proto index 1a4884afe0a1..67b6e2a014f4 100644 --- a/interactive_engine/proto/store_ingest_service.proto +++ b/interactive_engine/proto/store_ingest_service.proto @@ -25,6 +25,7 @@ service StoreIngest { message StoreIngestRequest { string dataPath = 1; + map config = 2; } message StoreIngestResponse { diff --git a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/MaxGraphClient.java b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/MaxGraphClient.java index 1e4b7dda3660..3a71eb11c0f7 100644 --- a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/MaxGraphClient.java +++ b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/MaxGraphClient.java @@ -74,10 +74,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; public class MaxGraphClient implements Closeable { @@ -205,6 +202,13 @@ public void ingestData(String path) { this.stub.ingestData(IngestDataRequest.newBuilder().setDataPath(path).build()); } + public void ingestData(String path, Map config) { + IngestDataRequest.Builder builder = IngestDataRequest.newBuilder(); + builder.setDataPath(path); + builder.putAllConfig(config); + this.stub.ingestData(builder.build()); + } + public String loadJsonSchema(Path jsonFile) throws IOException { String json = new String(Files.readAllBytes(jsonFile), StandardCharsets.UTF_8); return loadJsonSchema(json);