Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,34 @@ public class DSLConfigKeys implements Serializable {
.key("geaflow.dsl.source.parallelism")
.noDefaultValue()
.description("Set source parallelism");

public static final ConfigKey GEAFLOW_DSL_GCN_HOPS = ConfigKeys
.key("geaflow.dsl.gcn.hops")
.defaultValue(2)
.description("The hop count for built-in gcn.");

public static final ConfigKey GEAFLOW_DSL_GCN_FANOUT = ConfigKeys
.key("geaflow.dsl.gcn.fanout")
.defaultValue(-1)
.description("The max sampled neighbors per visited vertex for built-in gcn.");

public static final ConfigKey GEAFLOW_DSL_GCN_EDGE_DIRECTION = ConfigKeys
.key("geaflow.dsl.gcn.edge.direction")
.defaultValue("BOTH")
.description("The edge direction for built-in gcn. Optional values: IN, OUT, BOTH.");

public static final ConfigKey GEAFLOW_DSL_GCN_VERTEX_FEATURE_FIELDS = ConfigKeys
.key("geaflow.dsl.gcn.vertex.feature.fields")
.noDefaultValue()
.description("Comma-separated vertex feature field names for built-in gcn.");

public static final ConfigKey GEAFLOW_DSL_GCN_BATCH_SIZE = ConfigKeys
.key("geaflow.dsl.gcn.batch.size")
.defaultValue(64)
.description("The max infer batch size for built-in gcn.");

public static final ConfigKey GEAFLOW_DSL_GCN_EDGE_WEIGHT_FIELD = ConfigKeys
.key("geaflow.dsl.gcn.edge.weight.field")
.noDefaultValue()
.description("Optional edge weight field name for built-in gcn.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ public class FrameworkConfigKeys implements Serializable {
.defaultValue(true)
.description("infer env suppress log enable, default is true");

public static final ConfigKey INFER_CONTEXT_POOL_MAX_SIZE = ConfigKeys
.key("geaflow.infer.context.pool.max.size")
.defaultValue(8)
.description("max infer context count for the same config key, default is 8");

public static final ConfigKey INFER_CONTEXT_POOL_BORROW_TIMEOUT_SEC = ConfigKeys
.key("geaflow.infer.context.pool.borrow.timeout.sec")
.defaultValue(30)
.description("max wait time for borrowing infer context, default is 30 seconds");

public static final ConfigKey INFER_USER_DEFINE_LIB_PATH = ConfigKeys
.key("geaflow.infer.user.define.lib.path")
.noDefaultValue()
Expand Down Expand Up @@ -169,4 +179,3 @@ public class FrameworkConfigKeys implements Serializable {
.description("in dynmic graph, whether udf function materialize graph in finish");

}

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ public static int getPort(int minPort, int maxPort) {
num++;
}
}
throw new RuntimeException(String.format("no available port in [%d,%d]", minPort, maxPort));
// Fallback to an ephemeral port chosen by OS when the configured range is unavailable
// (for example, in constrained CI/network environments).
try (ServerSocket serverSocket = new ServerSocket(0)) {
return serverSocket.getLocalPort();
} catch (Exception e) {
throw new RuntimeException(String.format("no available port in [%d,%d]", minPort, maxPort));
}
}

public static int getPort(int port) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.geaflow.dsl.common.algo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.geaflow.dsl.common.data.Row;
import org.apache.geaflow.dsl.common.data.RowEdge;
import org.apache.geaflow.model.graph.edge.EdgeDirection;

/**
* Runtime context for model-backed algorithms that need inference and access to
* the active vertex's dynamic value and edges in the current batch.
*
* @param <K> The type of vertex IDs.
* @param <M> The type of messages that can be sent between vertices.
*/
public interface AlgorithmModelRuntimeContext<K, M> extends AlgorithmRuntimeContext<K, M> {

Object infer(Map<String, Object> payload);

default List<Object> inferBatch(List<Map<String, Object>> payloads) {
List<Object> results = new ArrayList<>(payloads.size());
for (Map<String, Object> payload : payloads) {
results.add(infer(payload));
}
return results;
}

Row loadDynamicVertexValue(Object vertexId);

List<RowEdge> loadDynamicEdges(Object vertexId, EdgeDirection direction);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.geaflow.dsl.common.algo;

import java.util.List;
import java.util.Optional;
import org.apache.geaflow.dsl.common.data.Row;
import org.apache.geaflow.dsl.common.data.RowVertex;

/**
* Optional extension for algorithms that can finalize results in batches.
*/
public interface BatchAlgorithmUserFunction<K, M> extends AlgorithmUserFunction<K, M> {

void finishBatch(List<RowVertex> graphVertices, List<Optional<Row>> updatedValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public static <S, T> ITypeCast<S, T> getTypeCast(Class<?> sourceType, Class<?> t
if (sourceType == targetType) {
return identityCast;
}
if (sourceType.isArray() && targetType == String.class) {
return (ITypeCast<S, T>) new Array2String();
}
if (sourceType.isArray() && targetType.isArray()) {
ITypeCast componentTypeCast = getTypeCast(sourceType.getComponentType(), targetType.getComponentType());
return (ITypeCast<S, T>) new ArrayCast(componentTypeCast, targetType.getComponentType());
Expand Down Expand Up @@ -179,6 +182,30 @@ public Object castTo(Object objects) {
}
}

private static class Array2String implements ITypeCast<Object, String> {

@Override
public String castTo(Object objects) {
if (objects == null) {
return null;
}
if (!objects.getClass().isArray()) {
return String.valueOf(objects);
}
int length = Array.getLength(objects);
StringBuilder builder = new StringBuilder();
builder.append('[');
for (int i = 0; i < length; i++) {
if (i > 0) {
builder.append(", ");
}
builder.append(String.valueOf(Array.get(objects, i)));
}
builder.append(']');
return builder.toString();
}
}

private static class Int2Long implements ITypeCast<Integer, Long> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.geaflow.dsl.udf.graph.ClusterCoefficient;
import org.apache.geaflow.dsl.udf.graph.CommonNeighbors;
import org.apache.geaflow.dsl.udf.graph.ConnectedComponents;
import org.apache.geaflow.dsl.udf.graph.GCN;
import org.apache.geaflow.dsl.udf.graph.IncKHopAlgorithm;
import org.apache.geaflow.dsl.udf.graph.IncMinimumSpanningTree;
import org.apache.geaflow.dsl.udf.graph.IncWeakConnectedComponents;
Expand Down Expand Up @@ -226,6 +227,7 @@ public class BuildInSqlFunctionTable extends ListSqlOperatorTable {
.add(GeaFlowFunction.of(SingleSourceShortestPath.class))
.add(GeaFlowFunction.of(AllSourceShortestPath.class))
.add(GeaFlowFunction.of(PageRank.class))
.add(GeaFlowFunction.of(GCN.class))
.add(GeaFlowFunction.of(KHop.class))
.add(GeaFlowFunction.of(KCore.class))
.add(GeaFlowFunction.of(IncrementalKCore.class))
Expand Down
Loading