Skip to content

Commit

Permalink
Supports bulk load from ODPS table and OSS for GIE (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
varinic committed Jun 13, 2022
1 parent 0b49a06 commit ded5b2e
Show file tree
Hide file tree
Showing 18 changed files with 1,099 additions and 27 deletions.
4 changes: 4 additions & 0 deletions interactive_engine/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions interactive_engine/coordinator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
81 changes: 80 additions & 1 deletion interactive_engine/data_load_tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,29 @@

<artifactId>data_load_tools</artifactId>

<properties>
<odps.sdk.public.version>0.36.4-public</odps.sdk.public.version>
</properties>

<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-commons</artifactId>
<version>${odps.sdk.public.version}</version>
</dependency>

<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>${odps.sdk.public.version}</version>
</dependency>

<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${odps.sdk.public.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba.maxgraph</groupId>
<artifactId>sdk</artifactId>
Expand All @@ -27,6 +49,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
Expand All @@ -36,6 +64,40 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>2.11.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
</dependency>
</dependencies>

<build>
Expand All @@ -49,6 +111,23 @@
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down Expand Up @@ -109,4 +188,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@

public class CommitDataCommand extends DataCommand {

public CommitDataCommand(String dataPath) throws IOException {
super(dataPath);
public CommitDataCommand(String dataPath, boolean isFromOSS) throws IOException {
super(dataPath, isFromOSS);
}

public void run() {
MaxGraphClient client = MaxGraphClient.newBuilder().setHosts(graphEndpoint).build();
MaxGraphClient client =
MaxGraphClient.newBuilder()
.setHosts(graphEndpoint)
.setUsername(username)
.setPassword(password)
.build();
Map<Long, DataLoadTarget> tableToTarget = new HashMap<>();
for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) {
long tableId = columnMappingInfo.getTableId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,83 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.*;

public abstract class DataCommand {

protected String dataPath;
protected String graphEndpoint;
protected GraphSchema schema;
protected Map<String, ColumnMappingInfo> columnMappingInfos;
protected String metaData;
protected String username;
protected String password;

public DataCommand(String dataPath) throws IOException {
protected final String metaFileName = "META";
protected final String OSS_ENDPOINT = "oss.endpoint";
protected final String OSS_ACCESS_ID = "oss.access.id";
protected final String OSS_ACCESS_KEY = "oss.access.key";
protected final String OSS_BUCKET_NAME = "oss.bucket.name";
protected final String OSS_OBJECT_NAME = "oss.object.name";
protected final String USER_NAME = "auth.username";
protected final String PASS_WORD = "auth.password";

public DataCommand(String dataPath, boolean isFromOSS) throws IOException {
this.dataPath = dataPath;
initialize();
initialize(isFromOSS);
}

private void initialize() throws IOException {
FileSystem fs = new Path(this.dataPath).getFileSystem(new Configuration());
try (FSDataInputStream inputStream = fs.open(new Path(this.dataPath, "META"))) {
String metaString = inputStream.readUTF();
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> metaMap =
objectMapper.readValue(metaString, new TypeReference<Map<String, String>>() {});
this.graphEndpoint = metaMap.get("endpoint");
this.schema = GraphSchemaMapper.parseFromJson(metaMap.get("schema")).toGraphSchema();
this.columnMappingInfos =
objectMapper.readValue(
metaMap.get("mappings"),
new TypeReference<Map<String, ColumnMappingInfo>>() {});
private void initialize(boolean isFromOSS) throws IOException {
if (isFromOSS) {
Properties properties = new Properties();
try (InputStream is = new FileInputStream(this.dataPath)) {
properties.load(is);
} catch (IOException e) {
throw e;
}
String ossEndPoint = properties.getProperty(OSS_ENDPOINT);
String ossAccessId = properties.getProperty(OSS_ACCESS_ID);
String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY);
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);
username = properties.getProperty(USER_NAME);
password = properties.getProperty(PASS_WORD);

dataPath =
"oss://"
+ Paths.get(
Paths.get(ossEndPoint, ossBucketName).toString(),
ossObjectName)
.toString();

Map<String, String> ossInfo = new HashMap<String, String>();
ossInfo.put(OSS_ENDPOINT, ossEndPoint);
ossInfo.put(OSS_ACCESS_ID, ossAccessId);
ossInfo.put(OSS_ACCESS_KEY, ossAccessKey);
OSSFileObj ossFileObj = new OSSFileObj(ossInfo);

this.metaData = ossFileObj.readBuffer(ossBucketName, ossObjectName, metaFileName);
ossFileObj.close();
} else {
FileSystem fs = new Path(this.dataPath).getFileSystem(new Configuration());
try (FSDataInputStream inputStream = fs.open(new Path(this.dataPath, "META"))) {
this.metaData = inputStream.readUTF();
}
}

ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> metaMap =
objectMapper.readValue(metaData, new TypeReference<Map<String, String>>() {});
this.graphEndpoint = metaMap.get("endpoint");
this.schema = GraphSchemaMapper.parseFromJson(metaMap.get("schema")).toGraphSchema();
this.columnMappingInfos =
objectMapper.readValue(
metaMap.get("mappings"),
new TypeReference<Map<String, ColumnMappingInfo>>() {});
}

public abstract void run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@

public class IngestDataCommand extends DataCommand {

public IngestDataCommand(String dataPath) throws IOException {
super(dataPath);
public IngestDataCommand(String dataPath, boolean isFromOSS) throws IOException {
super(dataPath, isFromOSS);
}

public void run() {
MaxGraphClient client = MaxGraphClient.newBuilder().setHosts(graphEndpoint).build();
MaxGraphClient client =
MaxGraphClient.newBuilder()
.setHosts(graphEndpoint)
.setUsername(username)
.setPassword(password)
.build();
client.ingestData(dataPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

public class LoadTool {

public static void ingest(String path, boolean isFromOSS) throws IOException {
new IngestDataCommand(path, isFromOSS).run();
}

public static void commit(String path, boolean isFromOSS) throws IOException {
new CommitDataCommand(path, isFromOSS).run();
}

public static void main(String[] args) throws ParseException, IOException {
Options options = new Options();
options.addOption(
Expand All @@ -22,17 +30,32 @@ public static void main(String[] args) throws ParseException, IOException {
.argName("HDFS_PATH")
.desc("data directory of HDFS. e.g., hdfs://1.2.3.4:9000/build_output")
.build());
options.addOption(
Option.builder("oss")
.longOpt("ossconfigfile")
.hasArg()
.argName("OSS_CONFIG_FILE")
.desc("OSS Config File. e.g., config.init")
.build());
options.addOption(Option.builder("h").longOpt("help").desc("print this message").build());
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(options, args);
String command = commandLine.getOptionValue("command");
String dir = commandLine.getOptionValue("dir");
String path = null;
boolean isFromOSS = false;
if (commandLine.hasOption("oss")) {
isFromOSS = true;
path = commandLine.getOptionValue("oss");
} else {
path = commandLine.getOptionValue("dir");
}

if (commandLine.hasOption("help") || command == null) {
printHelp(options);
} else if (command.equalsIgnoreCase("ingest")) {
new IngestDataCommand(dir).run();
ingest(path, isFromOSS);
} else if (command.equalsIgnoreCase("commit")) {
new CommitDataCommand(dir).run();
commit(path, isFromOSS);
} else {
printHelp(options);
}
Expand Down
Loading

0 comments on commit ded5b2e

Please sign in to comment.