Skip to content

Commit

Permalink
Server now use aks provided by client side (#2132)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Oct 17, 2022
1 parent 5c963f5 commit 759b7a5
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 71 deletions.
Expand Up @@ -40,9 +40,4 @@ public class StoreConfig {

public static final Config<Integer> EXECUTOR_ENGINE_PORT =
Config.intConfig("executor.engine.port", 0);

public static final Config<String> OSS_ACCESS_ID = Config.stringConfig("oss.access.id", "");

public static final Config<String> OSS_ACCESS_SECRET =
Config.stringConfig("oss.access.secret", "");
}
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.dataload.databuild.ColumnMappingInfo;
import com.alibaba.maxgraph.dataload.util.HttpClient;
import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -14,6 +15,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.file.Paths;
import java.util.*;

Expand All @@ -26,6 +28,9 @@ public abstract class DataCommand {
protected String metaData;
protected String username;
protected String password;

protected String ossAccessID;
protected String ossAccessKey;
protected String uniquePath;

protected final String metaFileName = "META";
Expand All @@ -34,6 +39,7 @@ public abstract class DataCommand {
protected final String OSS_ACCESS_KEY = "oss.access.key";
protected final String OSS_BUCKET_NAME = "oss.bucket.name";
protected final String OSS_OBJECT_NAME = "oss.object.name";
protected final String OSS_INFO_URL = "oss.info.url";
protected final String USER_NAME = "auth.username";
protected final String PASS_WORD = "auth.password";

Expand All @@ -43,6 +49,24 @@ public DataCommand(String configPath, boolean isFromOSS, String uniquePath) thro
initialize(isFromOSS);
}

private HashMap<String, String> getOSSInfoFromURL(String URL) throws IOException {
HttpClient client = new HttpClient();
HttpURLConnection conn = null;
try {
conn = client.createConnection(URL);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String, String>> typeRef =
new TypeReference<HashMap<String, String>>() {};
return mapper.readValue(conn.getInputStream(), typeRef);
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

private void initialize(boolean isFromOSS) throws IOException {
if (isFromOSS) {
Properties properties = new Properties();
Expand All @@ -51,23 +75,33 @@ private void initialize(boolean isFromOSS) throws IOException {
} catch (IOException e) {
throw e;
}
String ossEndPoint = properties.getProperty(OSS_ENDPOINT);
String ossAccessId = properties.getProperty(OSS_ACCESS_ID);
String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
this.ossAccessID = properties.getProperty(OSS_ACCESS_ID);
this.ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
String ossEndpoint = properties.getProperty(OSS_ENDPOINT);
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);
if (this.ossAccessID == null || this.ossAccessID.isEmpty()) {
String URL = properties.getProperty(OSS_INFO_URL);
HashMap<String, String> o = getOSSInfoFromURL(URL);
this.ossAccessID = o.get("ossAccessID");
this.ossAccessKey = o.get("ossAccessKey");
ossEndpoint = o.get("ossEndpoint");
ossBucketName = o.get("ossBucketName");
ossObjectName = o.get("ossObjectName");
}

username = properties.getProperty(USER_NAME);
password = properties.getProperty(PASS_WORD);

configPath =
"oss://"
+ Paths.get(
Paths.get(ossEndPoint, ossBucketName).toString(),
Paths.get(ossEndpoint, ossBucketName).toString(),
ossObjectName);

Map<String, String> ossInfo = new HashMap<>();
ossInfo.put(OSS_ENDPOINT, ossEndPoint);
ossInfo.put(OSS_ACCESS_ID, ossAccessId);
ossInfo.put(OSS_ENDPOINT, ossEndpoint);
ossInfo.put(OSS_ACCESS_ID, ossAccessID);
ossInfo.put(OSS_ACCESS_KEY, ossAccessKey);
OSSFileObj ossFileObj = new OSSFileObj(ossInfo);
ossObjectName = Paths.get(ossObjectName, uniquePath).toString();
Expand Down
Expand Up @@ -2,9 +2,14 @@

import com.alibaba.graphscope.groot.sdk.MaxGraphClient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;

public class IngestDataCommand extends DataCommand {
private static final Logger logger = LoggerFactory.getLogger(IngestDataCommand.class);

public IngestDataCommand(String dataPath, boolean isFromOSS, String uniquePath)
throws IOException {
Expand All @@ -18,8 +23,15 @@ public void run() {
.setUsername(username)
.setPassword(password)
.build();
// dataPath = Paths.get(dataPath, uniquePath).toString();
configPath = configPath + "/" + uniquePath;
client.ingestData(configPath);
if (ossAccessID == null || ossAccessKey == null) {
logger.warn("ossAccessID or ossAccessKey is null, using default configuration.");
client.ingestData(configPath);
} else {
HashMap<String, String> config = new HashMap<>();
config.put("ossAccessID", ossAccessID);
config.put("ossAccessKey", ossAccessKey);
client.ingestData(configPath, config);
}
}
}
Expand Up @@ -21,23 +21,23 @@ public class OSSFileObj {
protected final String OSS_ACCESS_ID = "oss.access.id";
protected final String OSS_ACCESS_KEY = "oss.access.key";

protected String ossEndPoint = null;
protected String ossAccessId = null;
protected String ossEndpoint = null;
protected String ossAccessID = null;
protected String ossAccessKey = null;

protected OSS ossClient = null;

public OSSFileObj(Map<String, String> ossInfo) throws IOException {
this.ossEndPoint = ossInfo.get(OSS_ENDPOINT);
this.ossAccessId = ossInfo.get(OSS_ACCESS_ID);
this.ossEndpoint = ossInfo.get(OSS_ENDPOINT);
this.ossAccessID = ossInfo.get(OSS_ACCESS_ID);
this.ossAccessKey = ossInfo.get(OSS_ACCESS_KEY);

if (!ossEndPoint.startsWith("http")) {
ossEndPoint = "https://" + ossEndPoint;
if (!ossEndpoint.startsWith("http")) {
ossEndpoint = "https://" + ossEndpoint;
}

try {
this.ossClient = new OSSClientBuilder().build(ossEndPoint, ossAccessId, ossAccessKey);
this.ossClient = new OSSClientBuilder().build(ossEndpoint, ossAccessID, ossAccessKey);
} catch (OSSException oe) {
throw new IOException(oe);
} catch (ClientException ce) {
Expand Down
Expand Up @@ -33,9 +33,9 @@
import java.util.Map;

public class DataBuildReducerOdps extends ReducerBase {
private String ossAccessId = null;
private String ossAccessID = null;
private String ossAccessKey = null;
private String ossEndPoint = null;
private String ossEndpoint = null;
private String ossBucketName = null;
private String ossObjectName = null;

Expand All @@ -51,16 +51,16 @@ public class DataBuildReducerOdps extends ReducerBase {

@Override
public void setup(TaskContext context) throws IOException {
this.ossAccessId = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID);
this.ossAccessID = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID);
this.ossAccessKey = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_KEY);
this.ossEndPoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT);
this.ossEndpoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT);
this.ossBucketName = context.getJobConf().get(OfflineBuildOdps.OSS_BUCKET_NAME);
this.ossObjectName = context.getJobConf().get(OfflineBuildOdps.OSS_OBJECT_NAME);
this.metaData = context.getJobConf().get(OfflineBuildOdps.META_INFO);
this.uniquePath = context.getJobConf().get(OfflineBuildOdps.UNIQUE_PATH);

if (!ossEndPoint.startsWith("http")) {
ossEndPoint = "https://" + ossEndPoint;
if (!ossEndpoint.startsWith("http")) {
ossEndpoint = "https://" + ossEndpoint;
}

this.taskId = context.getTaskID().toString();
Expand All @@ -69,8 +69,8 @@ public void setup(TaskContext context) throws IOException {
chkFileName = "part-r-" + taskId + ".chk";

Map<String, String> ossInfo = new HashMap<String, String>();
ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndPoint);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessId);
ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndpoint);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessID);
ossInfo.put(OfflineBuildOdps.OSS_ACCESS_KEY, ossAccessKey);

this.ossFileObj = new OSSFileObj(ossInfo);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.maxgraph.compiler.api.schema.GraphEdge;
import com.alibaba.maxgraph.compiler.api.schema.GraphElement;
import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.dataload.util.HttpClient;
import com.alibaba.maxgraph.sdkcommon.common.DataLoadTarget;
import com.alibaba.maxgraph.sdkcommon.schema.GraphSchemaMapper;
import com.alibaba.maxgraph.sdkcommon.util.UuidUtils;
Expand All @@ -36,6 +37,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.file.Paths;
import java.util.*;

Expand All @@ -57,13 +59,32 @@ public class OfflineBuildOdps {
public static final String OSS_ENDPOINT = "oss.endpoint";
public static final String OSS_BUCKET_NAME = "oss.bucket.name";
public static final String OSS_OBJECT_NAME = "oss.object.name";
public static final String OSS_INFO_URL = "oss.info.url";

public static final String META_INFO = "meta.info";
public static final String USER_NAME = "auth.username";
public static final String PASS_WORD = "auth.password";
public static final String UNIQUE_PATH = "unique.path";

public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
private static HashMap<String, String> getOSSInfoFromURL(String URL) throws IOException {
HttpClient client = new HttpClient();
HttpURLConnection conn = null;
try {
conn = client.createConnection(URL);
conn.setConnectTimeout(5000);
conn.setReadTimeout(5000);
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String, String>> typeRef =
new TypeReference<HashMap<String, String>>() {};
return mapper.readValue(conn.getInputStream(), typeRef);
} finally {
if (conn != null) {
conn.disconnect();
}
}
}

public static void main(String[] args) throws IOException {
String propertiesFile = args[0];
String uniquePath = UuidUtils.getBase64UUIDString();
// User can assign a unique path manually.
Expand All @@ -77,12 +98,22 @@ public static void main(String[] args)
}

String outputTable = properties.getProperty(OUTPUT_TABLE);
String ossAccessId = properties.getProperty(OSS_ACCESS_ID);
String ossAccessID = properties.getProperty(OSS_ACCESS_ID);
String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
String ossEndPoint = properties.getProperty(OSS_ENDPOINT);
String ossEndpoint = properties.getProperty(OSS_ENDPOINT);
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);

if (ossAccessID == null || ossAccessID.isEmpty()) {
String URL = properties.getProperty(OSS_INFO_URL);
HashMap<String, String> o = getOSSInfoFromURL(URL);
ossAccessID = o.get("ossAccessID");
ossAccessKey = o.get("ossAccessKey");
ossEndpoint = o.get("ossEndpoint");
ossBucketName = o.get("ossBucketName");
ossObjectName = o.get("ossObjectName");
}

// The table format is `project.table` or `table`;
// For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2`
String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG);
Expand Down Expand Up @@ -137,9 +168,9 @@ public static void main(String[] args)
job.setBoolean(SKIP_HEADER, skipHeader);
job.set(GRAPH_ENDPOINT, graphEndpoint);

job.set(OSS_ACCESS_ID, ossAccessId);
job.set(OSS_ACCESS_ID, ossAccessID);
job.set(OSS_ACCESS_KEY, ossAccessKey);
job.set(OSS_ENDPOINT, ossEndPoint);
job.set(OSS_ENDPOINT, ossEndpoint);
job.set(OSS_BUCKET_NAME, ossBucketName);
job.set(OSS_OBJECT_NAME, ossObjectName);

Expand Down
@@ -0,0 +1,77 @@
package com.alibaba.maxgraph.dataload.util;

import org.apache.commons.codec.binary.Base64;

import java.io.IOException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.nio.charset.StandardCharsets;

/**
* Simple HTTPClient which refer from <a href="https://github.com/eugenp/tutorials/blob/master/core-java-modules/core-java-networking-2/src/main/java/com/baeldung/url/auth/HttpClient.java">...</a>
*/
public class HttpClient {

private final String user;
private final String password;

public HttpClient() {
this("", "");
}

public HttpClient(String user, String password) {
this.user = user;
this.password = password;
}

public int sendRequestWithAuthHeader(String url) throws IOException {
HttpURLConnection connection = null;
try {
connection = createConnection(url);
connection.setRequestProperty("Authorization", createBasicAuthHeaderValue());
return connection.getResponseCode();
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public int sendRequestWithAuthenticator(String url) throws IOException {
setAuthenticator();
HttpURLConnection connection = null;
try {
connection = createConnection(url);
return connection.getResponseCode();
} finally {
if (connection != null) {
connection.disconnect();
}
}
}

public HttpURLConnection createConnection(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
return connection;
}

private String createBasicAuthHeaderValue() {
String auth = user + ":" + password;
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encodedAuth);
}

private void setAuthenticator() {
Authenticator.setDefault(new BasicAuthenticator());
}

private final class BasicAuthenticator extends Authenticator {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(user, password.toCharArray());
}
}
}
Expand Up @@ -331,6 +331,7 @@ public void getLoggerInfo(
public void ingestData(
IngestDataRequest request, StreamObserver<IngestDataResponse> responseObserver) {
String dataPath = request.getDataPath();
Map<String, String> config = request.getConfigMap();
logger.info("ingestData. path [" + dataPath + "]");
int storeCount = this.metaService.getStoreCount();
AtomicInteger counter = new AtomicInteger(storeCount);
Expand All @@ -339,6 +340,7 @@ public void ingestData(
this.storeIngestor.ingest(
i,
dataPath,
config,
new CompletionCallback<Void>() {
@Override
public void onCompleted(Void res) {
Expand Down

0 comments on commit 759b7a5

Please sign in to comment.