Skip to content

Commit

Permalink
Data load tools supports partition (#1900)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Aug 1, 2022
1 parent cd08a1a commit 2899903
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected void map(LongWritable key, Text value, Context context)
String fileName = fullPath.substring(fullPath.lastIndexOf('/') + 1);
ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(fileName);
if (columnMappingInfo == null) {
logger.warn("ignore [" + fileName + "]");
logger.warn("Mapper: ignore [" + fileName + "]");
return;
}

Expand Down Expand Up @@ -175,7 +175,7 @@ private Map<Integer, PropertyValue> buildPropertiesMap(
switch (name) {
case "creationDate":
case "joinDate":
val = converteDate(val);
val = convertDate(val);
break;
case "birthday":
val = val.replace("-", "");
Expand All @@ -188,7 +188,7 @@ private Map<Integer, PropertyValue> buildPropertiesMap(
return operationProperties;
}

public static String converteDate(String input) {
public static String convertDate(String input) {
try {
return DST_FMT.format(SRC_FMT.parse(input));
} catch (ParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,10 @@ public class DataBuildMapperOdps extends MapperBase {

private GraphSchema graphSchema;
private DataEncoder dataEncoder;
private String separator;
private Map<String, ColumnMappingInfo> fileToColumnMappingInfo;

private ObjectMapper objectMapper;
private boolean ldbcCustomize;
private boolean skipHeader;

private Record outKey;
private Record outVal;

Expand All @@ -55,7 +52,6 @@ public void setup(TaskContext context) throws IOException {
this.outVal = context.createMapOutputValueRecord();

this.objectMapper = new ObjectMapper();
this.separator = context.getJobConf().get(OfflineBuildOdps.SEPARATOR);
String schemaJson = context.getJobConf().get(OfflineBuildOdps.SCHEMA_JSON);
this.graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema();
this.dataEncoder = new DataEncoder(this.graphSchema);
Expand All @@ -65,17 +61,15 @@ public void setup(TaskContext context) throws IOException {
columnMappingsJson, new TypeReference<Map<String, ColumnMappingInfo>>() {});
this.ldbcCustomize =
context.getJobConf().getBoolean(OfflineBuildOdps.LDBC_CUSTOMIZE, false);
this.skipHeader = context.getJobConf().getBoolean(OfflineBuildOdps.SKIP_HEADER, true);
DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00"));
}

@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {

String tableName = context.getInputTableInfo().getTableName();
ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(tableName);
if (columnMappingInfo == null) {
System.out.println("ignore [" + tableName + "]");
System.out.println("Mapper: ignore [" + tableName + "]");
return;
}

Expand Down Expand Up @@ -176,7 +170,7 @@ private Map<Integer, PropertyValue> buildPropertiesMap(
switch (name) {
case "creationDate":
case "joinDate":
val = converteDate(val);
val = convertDate(val);
break;
case "birthday":
val = val.replace("-", "");
Expand All @@ -191,7 +185,7 @@ private Map<Integer, PropertyValue> buildPropertiesMap(
return operationProperties;
}

public static String converteDate(String input) {
public static String convertDate(String input) {
try {
return DST_FMT.format(SRC_FMT.parse(input));
} catch (ParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public static void main(String[] args)
String ossBucketName = properties.getProperty(OSS_BUCKET_NAME);
String ossObjectName = properties.getProperty(OSS_OBJECT_NAME);

// The table format is `project.table` or `table`;
// For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2`
String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG);
String graphEndpoint = properties.getProperty(GRAPH_ENDPOINT);
String username = properties.getProperty(USER_NAME);
Expand Down Expand Up @@ -122,13 +124,12 @@ public static void main(String[] args)
(fileName, fileColumnMapping) -> {
ColumnMappingInfo columnMappingInfo =
fileColumnMapping.toColumnMappingInfo(schema);
columnMappingInfos.put(fileName, columnMappingInfo);
// Note the project and partition is stripped (if exists)
columnMappingInfos.put(getTableName(fileName), columnMappingInfo);
tableType.put(fileName, schema.getElement(columnMappingInfo.getLabelId()));
});
String ldbcCustomize = properties.getProperty(LDBC_CUSTOMIZE, "true");
long splitSize = Long.valueOf(properties.getProperty(SPLIT_SIZE, "256"));
boolean loadAfterBuild =
properties.getProperty(LOAD_AFTER_BUILD, "false").equalsIgnoreCase("true");
boolean skipHeader = properties.getProperty(SKIP_HEADER, "true").equalsIgnoreCase("true");

JobConf job = new JobConf();
Expand All @@ -147,27 +148,9 @@ public static void main(String[] args)
job.set(OSS_OBJECT_NAME, ossObjectName);

for (Map.Entry<String, GraphElement> entry : tableType.entrySet()) {
if (entry.getValue() instanceof GraphVertex) {
if (entry.getValue() instanceof GraphVertex || entry.getValue() instanceof GraphEdge) {
String name = entry.getKey();
if (name.contains(".")) {
String[] items = name.split("\\.");
InputUtils.addTable(
TableInfo.builder().projectName(items[0]).tableName(items[1]).build(),
job);
} else {
InputUtils.addTable(TableInfo.builder().tableName(name).build(), job);
}
}
if (entry.getValue() instanceof GraphEdge) {
String name = entry.getKey();
if (name.contains(".")) {
String[] items = name.split("\\.");
InputUtils.addTable(
TableInfo.builder().projectName(items[0]).tableName(items[1]).build(),
job);
} else {
InputUtils.addTable(TableInfo.builder().tableName(name).build(), job);
}
InputUtils.addTable(parseTableURL(name), job);
}
}

Expand All @@ -179,13 +162,7 @@ public static void main(String[] args)
job.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));

if (outputTable.contains(".")) {
String[] items = outputTable.split("\\.");
OutputUtils.addTable(
TableInfo.builder().projectName(items[0]).tableName(items[1]).build(), job);
} else {
OutputUtils.addTable(TableInfo.builder().tableName(outputTable).build(), job);
}
OutputUtils.addTable(parseTableURL(outputTable), job);

String dataPath = Paths.get(ossBucketName, ossObjectName).toString();
Map<String, String> outputMeta = new HashMap<>();
Expand All @@ -206,4 +183,47 @@ public static void main(String[] args)
throw new IOException(e);
}
}

private static String getTableName(String tableFullName) {
String tableName;
if (tableFullName.contains(".")) {
String[] items = tableFullName.split("\\.");
tableName = items[1];
} else {
tableName = tableFullName;
}
if (tableName.contains("|")) {
String[] items = tableName.split("\\|");
tableName = items[0];
}
return tableName;
}

private static TableInfo parseTableURL(String tableFullName) {
String projectName = null;
String tableName = null;
String partitionSpec = null;
if (tableFullName.contains(".")) {
String[] items = tableFullName.split("\\.");
projectName = items[0];
tableName = items[1];
} else {
tableName = tableFullName;
}
if (tableName.contains("|")) {
String[] items = tableName.split("\\|");
tableName = items[0];
partitionSpec = items[1];
}

TableInfo.TableInfoBuilder builder = TableInfo.builder();
if (projectName != null) {
builder.projectName(projectName);
}
builder.tableName(tableName);
if (partitionSpec != null) {
builder.partSpec(partitionSpec);
}
return builder.build();
}
}

0 comments on commit 2899903

Please sign in to comment.