Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server now use aks provided by client side #2132

Merged
merged 8 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,4 @@ public class StoreConfig {

public static final Config<Integer> EXECUTOR_ENGINE_PORT =
Config.intConfig("executor.engine.port", 0);

public static final Config<String> OSS_ACCESS_ID = Config.stringConfig("oss.access.id", "");

public static final Config<String> OSS_ACCESS_SECRET =
Config.stringConfig("oss.access.secret", "");
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.dataload.databuild.ColumnMappingInfo;
import com.alibaba.maxgraph.dataload.util.HttpClient;
import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -14,6 +15,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.file.Paths;
import java.util.*;

Expand All @@ -26,6 +28,9 @@ public abstract class DataCommand {
protected String metaData;
protected String username;
protected String password;

protected String ossAccessID;
protected String ossAccessKey;
protected String uniquePath;

protected final String metaFileName = "META";
Expand All @@ -34,6 +39,7 @@ public abstract class DataCommand {
protected final String OSS_ACCESS_KEY = "oss.access.key";
protected final String OSS_BUCKET_NAME = "oss.bucket.name";
protected final String OSS_OBJECT_NAME = "oss.object.name";
protected final String OSS_INFO_URL = "oss.info.url";
protected final String USER_NAME = "auth.username";
protected final String PASS_WORD = "auth.password";

Expand All @@ -43,6 +49,24 @@ public DataCommand(String configPath, boolean isFromOSS, String uniquePath) thro
initialize(isFromOSS);
}

private HashMap<String, String> getOSSInfoFromURL(String URL) throws IOException {
HttpClient client = new HttpClient();
HttpURLConnection conn = null;
try {
conn = client.createConnection(URL);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String, String>> typeRef =
new TypeReference<HashMap<String, String>>() {};
return mapper.readValue(conn.getInputStream(), typeRef);
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

private void initialize(boolean isFromOSS) throws IOException {
if (isFromOSS) {
Properties properties = new Properties();
Expand All @@ -51,23 +75,33 @@ private void initialize(boolean isFromOSS) throws IOException {
} catch (IOException e) {
throw e;
}
String ossEndPoint = properties.getProperty(OSS_ENDPOINT);
String ossAccessId = properties.getProperty(OSS_ACCESS_ID);
String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
this.ossAccessID = properties.getProperty(OSS_ACCESS_ID);
this.ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
String ossEndpoint = properties.getProperty(OSS_ENDPOINT);
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);
if (this.ossAccessID == null || this.ossAccessID.isEmpty()) {
String URL = properties.getProperty(OSS_INFO_URL);
HashMap<String, String> o = getOSSInfoFromURL(URL);
this.ossAccessID = o.get("ossAccessID");
this.ossAccessKey = o.get("ossAccessKey");
ossEndpoint = o.get("ossEndpoint");
ossBucketName = o.get("ossBucketName");
ossObjectName = o.get("ossObjectName");
}

username = properties.getProperty(USER_NAME);
password = properties.getProperty(PASS_WORD);

configPath =
"oss://"
+ Paths.get(
Paths.get(ossEndPoint, ossBucketName).toString(),
Paths.get(ossEndpoint, ossBucketName).toString(),
ossObjectName);

Map<String, String> ossInfo = new HashMap<>();
ossInfo.put(OSS_ENDPOINT, ossEndPoint);
ossInfo.put(OSS_ACCESS_ID, ossAccessId);
ossInfo.put(OSS_ENDPOINT, ossEndpoint);
ossInfo.put(OSS_ACCESS_ID, ossAccessID);
ossInfo.put(OSS_ACCESS_KEY, ossAccessKey);
OSSFileObj ossFileObj = new OSSFileObj(ossInfo);
ossObjectName = Paths.get(ossObjectName, uniquePath).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import com.alibaba.graphscope.groot.sdk.MaxGraphClient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;

public class IngestDataCommand extends DataCommand {
private static final Logger logger = LoggerFactory.getLogger(IngestDataCommand.class);

public IngestDataCommand(String dataPath, boolean isFromOSS, String uniquePath)
throws IOException {
Expand All @@ -18,8 +23,15 @@ public void run() {
.setUsername(username)
.setPassword(password)
.build();
// dataPath = Paths.get(dataPath, uniquePath).toString();
configPath = configPath + "/" + uniquePath;
client.ingestData(configPath);
if (ossAccessID == null || ossAccessKey == null) {
logger.warn("ossAccessID or ossAccessKey is null, using default configuration.");
client.ingestData(configPath);
} else {
HashMap<String, String> config = new HashMap<>();
config.put("ossAccessID", ossAccessID);
config.put("ossAccessKey", ossAccessKey);
client.ingestData(configPath, config);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@ public class OSSFileObj {
protected final String OSS_ACCESS_ID = "oss.access.id";
protected final String OSS_ACCESS_KEY = "oss.access.key";

protected String ossEndPoint = null;
protected String ossAccessId = null;
protected String ossEndpoint = null;
protected String ossAccessID = null;
protected String ossAccessKey = null;

protected OSS ossClient = null;

public OSSFileObj(Map<String, String> ossInfo) throws IOException {
this.ossEndPoint = ossInfo.get(OSS_ENDPOINT);
this.ossAccessId = ossInfo.get(OSS_ACCESS_ID);
this.ossEndpoint = ossInfo.get(OSS_ENDPOINT);
this.ossAccessID = ossInfo.get(OSS_ACCESS_ID);
this.ossAccessKey = ossInfo.get(OSS_ACCESS_KEY);

if (!ossEndPoint.startsWith("http")) {
ossEndPoint = "https://" + ossEndPoint;
if (!ossEndpoint.startsWith("http")) {
ossEndpoint = "https://" + ossEndpoint;
}

try {
this.ossClient = new OSSClientBuilder().build(ossEndPoint, ossAccessId, ossAccessKey);
this.ossClient = new OSSClientBuilder().build(ossEndpoint, ossAccessID, ossAccessKey);
} catch (OSSException oe) {
throw new IOException(oe);
} catch (ClientException ce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import java.util.Map;

public class DataBuildReducerOdps extends ReducerBase {
private String ossAccessId = null;
private String ossAccessID = null;
private String ossAccessKey = null;
private String ossEndPoint = null;
private String ossEndpoint = null;
private String ossBucketName = null;
private String ossObjectName = null;

Expand All @@ -51,16 +51,16 @@ public class DataBuildReducerOdps extends ReducerBase {

@Override
public void setup(TaskContext context) throws IOException {
this.ossAccessId = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID);
this.ossAccessID = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID);
this.ossAccessKey = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_KEY);
this.ossEndPoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT);
this.ossEndpoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT);
this.ossBucketName = context.getJobConf().get(OfflineBuildOdps.OSS_BUCKET_NAME);
this.ossObjectName = context.getJobConf().get(OfflineBuildOdps.OSS_OBJECT_NAME);
this.metaData = context.getJobConf().get(OfflineBuildOdps.META_INFO);
this.uniquePath = context.getJobConf().get(OfflineBuildOdps.UNIQUE_PATH);

if (!ossEndPoint.startsWith("http")) {
ossEndPoint = "https://" + ossEndPoint;
if (!ossEndpoint.startsWith("http")) {
ossEndpoint = "https://" + ossEndpoint;
}

this.taskId = context.getTaskID().toString();
Expand All @@ -69,8 +69,8 @@ public void setup(TaskContext context) throws IOException {
chkFileName = "part-r-" + taskId + ".chk";

Map<String, String> ossInfo = new HashMap<String, String>();
ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndPoint);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessId);
ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndpoint);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessID);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_KEY, ossAccessKey);

this.ossFileObj = new OSSFileObj(ossInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.maxgraph.compiler.api.schema.GraphEdge;
import com.alibaba.maxgraph.compiler.api.schema.GraphElement;
import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.dataload.util.HttpClient;
import com.alibaba.maxgraph.sdkcommon.common.DataLoadTarget;
import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper;
import com.alibaba.maxgraph.sdkcommon.util.UuidUtils;
Expand All @@ -36,6 +37,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.file.Paths;
import java.util.*;

Expand All @@ -57,13 +59,32 @@ public class OfflineBuildOdps {
public static final String OSS_ENDPOINT = "oss.endpoint";
public static final String OSS_BUCKET_NAME = "oss.bucket.name";
public static final String OSS_OBJECT_NAME = "oss.object.name";
public static final String OSS_INFO_URL = "oss.info.url";

public static final String META_INFO = "meta.info";
public static final String USER_NAME = "auth.username";
public static final String PASS_WORD = "auth.password";
public static final String UNIQUE_PATH = "unique.path";

public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
private static HashMap<String, String> getOSSInfoFromURL(String URL) throws IOException {
HttpClient client = new HttpClient();
HttpURLConnection conn = null;
try {
conn = client.createConnection(URL);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String, String>> typeRef =
new TypeReference<HashMap<String, String>>() {};
return mapper.readValue(conn.getInputStream(), typeRef);
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

public static void main(String[] args) throws IOException {
String propertiesFile = args[0];
String uniquePath = UuidUtils.getBase64UUIDString();
// User can assign a unique path manually.
Expand All @@ -77,12 +98,22 @@ public static void main(String[] args)
}

String outputTable = properties.getProperty(OUTPUT_TABLE);
String ossAccessId = properties.getProperty(OSS_ACCESS_ID);
String ossAccessID = properties.getProperty(OSS_ACCESS_ID);
String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
String ossEndPoint = properties.getProperty(OSS_ENDPOINT);
String ossEndpoint = properties.getProperty(OSS_ENDPOINT);
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);

if (ossAccessID == null || ossAccessID.isEmpty()) {
String URL = properties.getProperty(OSS_INFO_URL);
HashMap<String, String> o = getOSSInfoFromURL(URL);
ossAccessID = o.get("ossAccessID");
ossAccessKey = o.get("ossAccessKey");
ossEndpoint = o.get("ossEndpoint");
ossBucketName = o.get("ossBucketName");
ossObjectName = o.get("ossObjectName");
}

// The table format is `project.table` or `table`;
// For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2`
String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG);
Expand Down Expand Up @@ -137,9 +168,9 @@ public static void main(String[] args)
job.setBoolean(SKIP_HEADER, skipHeader);
job.set(GRAPH_ENDPOINT, graphEndpoint);

job.set(OSS_ACCESS_ID, ossAccessId);
job.set(OSS_ACCESS_ID, ossAccessID);
job.set(OSS_ACCESS_KEY, ossAccessKey);
job.set(OSS_ENDPOINT, ossEndPoint);
job.set(OSS_ENDPOINT, ossEndpoint);
job.set(OSS_BUCKET_NAME, ossBucketName);
job.set(OSS_OBJECT_NAME, ossObjectName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.alibaba.maxgraph.dataload.util;

import org.apache.commons.codec.binary.Base64;

import java.io.IOException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.nio.charset.StandardCharsets;

/**
* Simple HTTPClient which refer from <a href="https://github.com/eugenp/tutorials/blob/master/core-java-modules/core-java-networking-2/src/main/java/com/baeldung/url/auth/HttpClient.java">...</a>
*/
public class HttpClient {

private final String user;
private final String password;

public HttpClient() {
this("", "");
}

public HttpClient(String user, String password) {
this.user = user;
this.password = password;
}

public int sendRequestWithAuthHeader(String url) throws IOException {
HttpURLConnection connection = null;
try {
connection = createConnection(url);
connection.setRequestProperty("Authorization", createBasicAuthHeaderValue());
return connection.getResponseCode();
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public int sendRequestWithAuthenticator(String url) throws IOException {
setAuthenticator();
HttpURLConnection connection = null;
try {
connection = createConnection(url);
return connection.getResponseCode();
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public HttpURLConnection createConnection(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
return connection;
}

private String createBasicAuthHeaderValue() {
String auth = user + ":" + password;
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encodedAuth);
}

private void setAuthenticator() {
Authenticator.setDefault(new BasicAuthenticator());
}

private final class BasicAuthenticator extends Authenticator {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(user, password.toCharArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public void getLoggerInfo(
public void ingestData(
IngestDataRequest request, StreamObserver<IngestDataResponse> responseObserver) {
String dataPath = request.getDataPath();
Map<String, String> config = request.getConfigMap();
logger.info("ingestData. path [" + dataPath + "]");
int storeCount = this.metaService.getStoreCount();
AtomicInteger counter = new AtomicInteger(storeCount);
Expand All @@ -339,6 +340,7 @@ public void ingestData(
this.storeIngestor.ingest(
i,
dataPath,
config,
new CompletionCallback<Void>() {
@Override
public void onCompleted(Void res) {
Expand Down
Loading