Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Parse Graph Schema from Yaml Configuration #3145

Merged
merged 12 commits into from
Sep 4, 2023
2 changes: 1 addition & 1 deletion flex/tests/hqps/hqps_cypher_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ GRAPH_SCHEMA_YAML=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yam
GRAPH_BULK_LOAD_YAML=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_bulk_load.yaml
COMPILER_GRAPH_SCHEMA=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/ldbc_schema_csr_ic.json
GRAPH_CSR_DATA_DIR=${HOME}/csr-data-dir/
HQPS_IR_CONF=/tmp/hqps.ir.conf
HQPS_IR_CONF=/tmp/hqps.ir.properties
# check if GRAPH_SCHEMA_YAML exists
if [ ! -f ${GRAPH_SCHEMA_YAML} ]; then
echo "GRAPH_SCHEMA_YAML: ${GRAPH_SCHEMA_YAML} not found"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.alibaba.graphscope.common.config;

import com.alibaba.graphscope.common.utils.FileUtils;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -81,10 +83,15 @@ public String toString() {

public static class Factory {
public static Configs create(String file) throws Exception {
if (file.endsWith(".yaml")) {
return new YamlConfigs(file);
} else {
return new Configs(file);
switch (FileUtils.getFormatType(file)) {
case YAML:
return new YamlConfigs(file);
case PROPERTIES:
return new Configs(file);
case JSON:
default:
throw new UnsupportedOperationException(
"can not initiate Configs from the file " + file);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public GraphStoredProcedures(MetaDataReader reader) throws Exception {
for (InputStream inputStream : reader.getStoredProcedures()) {
StoredProcedureMeta createdMeta = StoredProcedureMeta.Deserializer.perform(inputStream);
this.storedProcedureMetaMap.put(createdMeta.getName(), createdMeta);
inputStream.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.ir.meta.reader;

public enum FileFormatType {
YAML,
JSON,
PROPERTIES
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.GraphConfig;
import com.alibaba.graphscope.common.config.Utils;
import com.alibaba.graphscope.common.utils.FileUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.nio.file.Paths;
import java.io.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

// a local file system implementation of MetaDataReader
Expand All @@ -44,7 +45,7 @@ public LocalMetaDataReader(Configs configs) {
}

@Override
public List<InputStream> getStoredProcedures() throws FileNotFoundException {
public List<InputStream> getStoredProcedures() throws IOException {
String procedurePath = GraphConfig.GRAPH_STORED_PROCEDURES.get(configs);
File procedureDir = new File(procedurePath);
if (!procedureDir.exists() || !procedureDir.isDirectory()) {
Expand All @@ -60,27 +61,49 @@ public List<InputStream> getStoredProcedures() throws FileNotFoundException {
procedureInputs.add(new FileInputStream(file));
}
} else {
Map<String, InputStream> procedureInputMap =
getProcedureNameWithInputStream(procedureDir);
for (String enableProcedure : enableProcedureList) {
File procedureFile =
new File(Paths.get(procedurePath, enableProcedure + ".yaml").toString());
if (!procedureFile.exists()) {
logger.warn(
"procedure {} not exist in directory {}",
procedureFile.getName(),
procedurePath);
} else {
procedureInputs.add(new FileInputStream(procedureFile));
}
InputStream enableInput = procedureInputMap.get(enableProcedure);
Preconditions.checkArgument(
enableInput != null,
"can not find procedure with name=%s under directory=%s, candidates are %s",
enableProcedure,
procedureDir,
procedureInputMap.keySet());
procedureInputs.add(enableInput);
}
}
return Collections.unmodifiableList(procedureInputs);
}

private Map<String, InputStream> getProcedureNameWithInputStream(File procedureDir)
throws IOException {
Map<String, InputStream> procedureInputMap = Maps.newHashMap();
for (File file : procedureDir.listFiles()) {
String procedureName = getProcedureName(file);
procedureInputMap.put(procedureName, new FileInputStream(file));
}
return procedureInputMap;
}

private String getProcedureName(File file) throws IOException {
try (InputStream inputStream = new FileInputStream(file)) {
Yaml yaml = new Yaml();
Map<String, Object> map = yaml.load(inputStream);
Object procedureName = map.get("name");
Preconditions.checkArgument(
procedureName != null, "procedure name not exist in %s", file.getName());
return procedureName.toString();
}
}

@Override
public InputStream getGraphSchema() throws FileNotFoundException {
public SchemaInputStream getGraphSchema() throws IOException {
String schemaPath =
Objects.requireNonNull(
GraphConfig.GRAPH_SCHEMA.get(configs), "schema path not exist");
return new FileInputStream(schemaPath);
return new SchemaInputStream(
new FileInputStream(schemaPath), FileUtils.getFormatType(schemaPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public interface MetaDataReader {
// if enableProcedures is null, return all stored procedures
List<InputStream> getStoredProcedures() throws Exception;

InputStream getGraphSchema() throws Exception;
SchemaInputStream getGraphSchema() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
* limitations under the License.
*/

package com.alibaba.graphscope.common.ir.meta.schema;
package com.alibaba.graphscope.common.ir.meta.reader;

import com.alibaba.graphscope.groot.common.schema.api.GraphSchema;
import java.io.InputStream;

import org.apache.calcite.schema.Statistic;
public class SchemaInputStream {
private final InputStream inputStream;
private final FileFormatType formatType;

import java.util.List;
public SchemaInputStream(InputStream inputStream, FileFormatType formatType) {
this.inputStream = inputStream;
this.formatType = formatType;
}

/**
* Extends {@link GraphSchema} to add {@link Statistic}
*/
public interface StatisticSchema extends GraphSchema {
// get meta for CBO
Statistic getStatistic(List<String> tableName);

// if the property name need to be converted to id
boolean isColumnId();
public InputStream getInputStream() {
return inputStream;
}

// schema json for ir core
String schemaJson();
public FileFormatType getFormatType() {
return formatType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.util.Static;
import org.apache.commons.lang3.ObjectUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -37,9 +36,9 @@
*/
public class GraphOptSchema implements RelOptSchema {
private RelOptCluster optCluster;
private StatisticSchema rootSchema;
private IrGraphSchema rootSchema;

public GraphOptSchema(@Nullable RelOptCluster optCluster, StatisticSchema rootSchema) {
public GraphOptSchema(@Nullable RelOptCluster optCluster, IrGraphSchema rootSchema) {
this.optCluster = optCluster;
this.rootSchema = Objects.requireNonNull(rootSchema);
}
Expand All @@ -57,15 +56,14 @@ public RelOptTable getTableForMember(List<String> tableName) {
String labelName = tableName.get(0);
try {
GraphElement element = rootSchema.getElement(labelName);
return createRelOptTable(tableName, element, rootSchema.getStatistic(tableName));
return createRelOptTable(tableName, element);
} catch (GraphElementNotFoundException e) {
throw Static.RESOURCE.tableNotFound(labelName).ex();
}
}

private RelOptTable createRelOptTable(
List<String> tableName, GraphElement element, Statistic statistic) {
return new GraphOptTable(this, tableName, element, statistic);
private RelOptTable createRelOptTable(List<String> tableName, GraphElement element) {
return new GraphOptTable(this, tableName, element);
}

/**
Expand All @@ -78,7 +76,7 @@ public RelDataTypeFactory getTypeFactory() {
return this.optCluster.getTypeFactory();
}

public StatisticSchema getRootSchema() {
public IrGraphSchema getRootSchema() {
return this.rootSchema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,10 @@ public class GraphOptTable implements RelOptTable {
private List<String> tableName;
private RelOptSchema schema;
private RelDataType dataType;
private Statistic statistic;

protected GraphOptTable(
RelOptSchema schema,
List<String> tableName,
GraphElement element,
Statistic statistic) {
protected GraphOptTable(RelOptSchema schema, List<String> tableName, GraphElement element) {
this.schema = schema;
this.tableName = tableName;
this.statistic = statistic;
this.dataType = deriveType(element);
}

Expand Down Expand Up @@ -155,12 +149,12 @@ public RelDataType getRowType() {

@Override
public boolean isKey(ImmutableBitSet immutableBitSet) {
return this.statistic.isKey(immutableBitSet);
throw new UnsupportedOperationException("is key is unsupported yet in statistics");
}

@Override
public @Nullable List<ImmutableBitSet> getKeys() {
return this.statistic.getKeys();
throw new UnsupportedOperationException("get keys is unsupported yet in statistics");
}

@Override
Expand Down