From 8d9b931d69db0b127d20074047cd5bf0c4cc1c6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Fri, 17 Jan 2025 21:29:16 +0100 Subject: [PATCH 01/16] Add parquet operators to the Python API --- python/.gitignore | 1 + python/src/pywy/basic/data/__init__.py | 16 ++++++ python/src/pywy/basic/data/record.py | 55 +++++++++++++++++++ python/src/pywy/core/serializer.py | 6 ++ python/src/pywy/dataquanta.py | 10 +++- python/src/pywy/operators/__init__.py | 3 +- python/src/pywy/operators/source.py | 20 +++++++ .../wayang-api-json/src/main/scala/Main.scala | 4 -- .../main/scala/builder/JsonPlanBuilder.scala | 17 +++--- .../OperatorFromDrawflowConverter.scala | 1 + .../operatorfromjson/OperatorFromJson.scala | 6 +- .../scala/operatorfromjson/PlanFromJson.scala | 8 --- .../input/ParquetInputFromJson.scala | 40 ++++++++++++++ 13 files changed, 162 insertions(+), 25 deletions(-) create mode 100644 python/src/pywy/basic/data/__init__.py create mode 100644 python/src/pywy/basic/data/record.py create mode 100644 wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala diff --git a/python/.gitignore b/python/.gitignore index cd85f7a5f..ceb1ca3e1 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,3 +1,4 @@ # Build files dist +__pycache__ diff --git a/python/src/pywy/basic/data/__init__.py b/python/src/pywy/basic/data/__init__.py new file mode 100644 index 000000000..d9e26de2a --- /dev/null +++ b/python/src/pywy/basic/data/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/src/pywy/basic/data/record.py b/python/src/pywy/basic/data/record.py new file mode 100644 index 000000000..55358808a --- /dev/null +++ b/python/src/pywy/basic/data/record.py @@ -0,0 +1,55 @@ +from typing import List, cast + +from python.src.pywy.types import GenericTco, ListT, T + + +class Record: + """ + A Type that represents a record with a schema. + """ + values: ListT + + def __init__(self, values: ListT) -> None: + self.values = values + + def copy(self) -> 'Record': + return Record(self.values.copy()) + + def equals(self, o: T) -> bool: + if self == 0: + return True + if o is None or type(self) != type(o): + return False + + record_2 = cast(Record, o) + return self.values == o.values + + def hash_code(self) -> int: + return hash(self.values) + + def get_field(self, index: int) -> T: + return self.values[index] + + def get_double(self, index: int) -> float: + return float(self.values[index]) + + def get_int(self, index: int) -> int: + return int(self.values[index]) + + def get_string(self, index: int) -> str: + return str(self.values[index]) + + def set_field(self, index: int, field: T): + self.values[index] = field + + def addField(self, field: T): + self.values.append(field) + + def size(self) -> int: + return len(self.values) + + def __str__(self): + return "Record" + str(self.values) + + def __repr__(self): + return self.__str__() diff --git a/python/src/pywy/core/serializer.py b/python/src/pywy/core/serializer.py index 48dd6033f..ede6eab7b 100644 --- a/python/src/pywy/core/serializer.py +++ b/python/src/pywy/core/serializer.py @@ -86,6 +86,12 @@ def serialize(self, operator): if hasattr(operator, "path"): json_operator["data"]["filename"] = operator.path + if hasattr(operator, "projection"): + json_operator["data"]["projection"] = operator.projection + + if hasattr(operator, "column_names"): + json_operator["data"]["projection"] = operator.column_names + return json_operator def serialize_pipeline(self, pipeline): diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 59b8b5bb3..0c88dd2f1 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -15,16 +15,17 @@ # limitations under the License. # -from typing import Dict, Set, List, cast +from typing import Dict, Set, List, Optional, cast from pywy.core.core import Plugin, PywyPlan from pywy.operators.base import PO_T from pywy.types import (GenericTco, Predicate, Function, BiFunction, FlatmapFunction, IterableOut, T, In, Out) from pywy.operators import * -from pywy.basic.model.ops import Op from pywy.basic.model.option import Option from pywy.basic.model.models import Model +from python.src.pywy.basic.data.record import Record + class Configuration: entries: Dict[str, str] @@ -68,6 +69,11 @@ def unregister(self, *plugins: Plugin): def textfile(self, file_path: str) -> 'DataQuanta[str]': return DataQuanta(self, TextFileSource(file_path)) + def parquet( + self, file_path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None + ) -> 'DataQuanta[Record]': + return DataQuanta(self, ParquetSource(file_path, projection, column_names)) + def __str__(self): return "Plugins: {}".format(str(self.plugins)) diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py index 99ad6f0b9..66c0ccdc0 100644 --- a/python/src/pywy/operators/__init__.py +++ b/python/src/pywy/operators/__init__.py @@ -17,7 +17,7 @@ from pywy.operators.base import PywyOperator from pywy.operators.sink import TextFileSink, SinkOperator -from pywy.operators.source import TextFileSource, SourceUnaryOperator +from pywy.operators.source import TextFileSource, ParquetSource, SourceUnaryOperator from pywy.operators.unary import UnaryToUnaryOperator, FilterOperator, MapOperator, FlatmapOperator, ReduceByKeyOperator, SortOperator from pywy.operators.binary import BinaryToUnaryOperator, JoinOperator, DLTrainingOperator, PredictOperator @@ -27,6 +27,7 @@ BinaryToUnaryOperator, TextFileSink, TextFileSource, + ParquetSource, FilterOperator, SinkOperator, SortOperator, diff --git a/python/src/pywy/operators/source.py b/python/src/pywy/operators/source.py index 479e5c5ab..0c8d9a13a 100644 --- a/python/src/pywy/operators/source.py +++ b/python/src/pywy/operators/source.py @@ -15,6 +15,8 @@ # limitations under the License. # +from typing import List, Optional + from pywy.operators.base import PywyOperator @@ -55,3 +57,21 @@ def __str__(self): def __repr__(self): return super().__repr__() +class ParquetSource(SourceUnaryOperator): + path: str + projection: Optional[List[str]] + column_names: Optional[List[str]] + json_name: str + + def __init__(self, path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None): + super(ParquetSource, self).__init__('Parquet') + self.path = path + self.projection = projection + self.column_names = column_names + self.json_name = "parquetInput" + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() diff --git a/wayang-api/wayang-api-json/src/main/scala/Main.scala b/wayang-api/wayang-api-json/src/main/scala/Main.scala index 33a67a2cf..adbf2c02a 100644 --- a/wayang-api/wayang-api-json/src/main/scala/Main.scala +++ b/wayang-api/wayang-api-json/src/main/scala/Main.scala @@ -18,18 +18,14 @@ package org.apache.wayang.api.json import zio._ -import zio.IO import zio.http._ -import zio.Console._ import scala.util.Try import org.apache.wayang.api.json.builder.JsonPlanBuilder import org.apache.wayang.api.json.operatorfromdrawflow.OperatorFromDrawflowConverter import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson import org.apache.wayang.api.json.parserutil.ParseOperatorsFromDrawflow -import org.apache.wayang.api.json.parserutil.ParseOperatorsFromJson import org.apache.wayang.api.json.parserutil.ParsePlanFromJson -import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson object Main extends ZIOAppDefault { val drawRoute = diff --git a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala index f4e7a2b2f..27cb32d25 100644 --- a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala +++ b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala @@ -22,7 +22,7 @@ import org.apache.wayang.api.json.parserutil.{SerializableIterable, Serializable import org.apache.wayang.api.json.operatorfromjson.{ComposedOperatorFromJson, OperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, DLTrainingOperatorFromJson, UnionOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TableInputFromJson, TextFileInputFromJson} +import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TableInputFromJson, TextFileInputFromJson, ParquetInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, ReduceOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} @@ -42,15 +42,8 @@ import org.apache.wayang.flink.Flink import org.apache.wayang.tensorflow.Tensorflow import org.apache.wayang.genericjdbc.GenericJdbc import org.apache.wayang.sqlite3.Sqlite3 -import org.apache.wayang.postgres.Postgres import org.apache.wayang.core.plugin.Plugin import org.apache.wayang.basic.model.DLModel; -import org.apache.wayang.basic.model.optimizer.Adam; -import org.apache.wayang.basic.model.op.nn._; -import org.apache.wayang.basic.model.op._; -import org.apache.wayang.basic.model.optimizer._; -import org.apache.wayang.api.json.operatorfromjson.binary.{Op => JsonOp} -import org.apache.wayang.api.util.NDimArray import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ @@ -160,6 +153,7 @@ class JsonPlanBuilder() { operator match { // input case inputOperator: TextFileInputFromJson => visit(inputOperator, planBuilder) + case inputOperator: ParquetInputFromJson => visit(inputOperator, planBuilder) case inputOperator: InputCollectionFromJson => visit(inputOperator, planBuilder) case inputOperator: TableInputFromJson => visit(inputOperator, planBuilder) case inputOperator: JDBCRemoteInputFromJson => visit(inputOperator, planBuilder) @@ -210,6 +204,13 @@ class JsonPlanBuilder() { planBuilder.readTextFile(operator.data.filename).asInstanceOf[DataQuanta[Any]].withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } + private def visit(operator: ParquetInputFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { + if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) + planBuilder.readParquet(new ParquetSource(operator.data.filename, operator.data.projection, operator.data.columnNames: _*)).asInstanceOf[DataQuanta[Any]] + else + planBuilder.readParquet(new ParquetSource(operator.data.filename, operator.data.projection, operator.data.columnNames: _*)).withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)).asInstanceOf[DataQuanta[Any]] + } + private def visit(operator: InputCollectionFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { val iterable = SerializableIterable.create(operator.data.udf) if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala index f748af627..f93eb0e47 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala @@ -81,6 +81,7 @@ object OperatorFromDrawflowConverter { // input case "iBinary" => InputCollectionFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.InputCollection, InputCollectionFromJson.Data(operatorFromDrawflow.data("collectionGeneratorFunction").asInstanceOf[String]), executionPlatform) case "iTextFile" => TextFileInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileInput, TextFileInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String]), executionPlatform) + case "iParquet" => ParquetInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ParquetInput, ParquetInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String], operatorFromDrawflow.data("projection").asInstanceOf[Array[String]], operatorFromDrawflow.data("columnNames").asInstanceOf[List[String]]), executionPlatform) case "iCsvFile" => TableInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Table, TableInputFromJson.Data(operatorFromDrawflow.data("tableName").asInstanceOf[String], operatorFromDrawflow.data("columnNames").asInstanceOf[String].split(",").map(s => s.trim()).toList), executionPlatform) case "iJdbc" => JDBCRemoteInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.JDBCRemoteInput, JDBCRemoteInputFromJson.Data( diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala index 6514610b3..42bcf0be2 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala @@ -24,9 +24,9 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson.OperatorNames -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, DLTrainingOperatorFromJson,UnionOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, DLTrainingOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, UnionOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} +import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson, ParquetInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} @@ -37,6 +37,7 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, // Input operators new JsonSubTypes.Type(value = classOf[TextFileInputFromJson], name = OperatorNames.TextFileInput), + new JsonSubTypes.Type(value = classOf[ParquetInputFromJson], name = OperatorNames.ParquetInput), new JsonSubTypes.Type(value = classOf[InputCollectionFromJson], name = OperatorNames.InputCollection), new JsonSubTypes.Type(value = classOf[InputCollectionFromJson], name = OperatorNames.Table), new JsonSubTypes.Type(value = classOf[JDBCRemoteInputFromJson], name = OperatorNames.JDBCRemoteInput), @@ -137,6 +138,7 @@ object OperatorFromJson { object OperatorNames { // Input final val TextFileInput = "textFileInput" + final val ParquetInput = "parquetInput" final val InputCollection = "inputCollection" final val Table = "table" final val JDBCRemoteInput = "jdbcRemoteInput" diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala index e4cd7b3bf..8693b5673 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala @@ -17,19 +17,11 @@ */ package org.apache.wayang.api.json.operatorfromjson -import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson.OperatorNames -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, UnionOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} -import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} class PlanFromJson(val context: ContextFromJson, val operators: List[OperatorFromJson] diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala new file mode 100644 index 000000000..b3bffc96e --- /dev/null +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.json.operatorfromjson.input + +import com.fasterxml.jackson.annotation.JsonTypeName +import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName(OperatorFromJson.OperatorNames.TextFileInput) +case class ParquetInputFromJson(override val id: Long, + override val input: Array[Long], + override val output: Array[Long], + override val cat: String, + override val operatorName: String, + val data: ParquetInputFromJson.Data, + override val executionPlatform: String = null) + extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { +} + +@JsonIgnoreProperties(ignoreUnknown = true) +object ParquetInputFromJson { + @JsonIgnoreProperties(ignoreUnknown = true) + case class Data(filename: String, projection: Array[String] = null, columnNames: List[String] = Nil) +} From 2a201bb5547e2e1b611b639505793f10cce77b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Fri, 17 Jan 2025 21:29:26 +0100 Subject: [PATCH 02/16] Update .gitignore --- python/.gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/python/.gitignore b/python/.gitignore index ceb1ca3e1..d8d1f9b60 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,4 +1,3 @@ # Build files dist __pycache__ - From 06e4a2c85bf9c1ca25129372d854546f3e257f7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Fri, 17 Jan 2025 21:30:01 +0100 Subject: [PATCH 03/16] Update core.py --- python/src/pywy/core/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py index 17e197d5f..74cf008a8 100644 --- a/python/src/pywy/core/core.py +++ b/python/src/pywy/core/core.py @@ -1,4 +1,3 @@ - # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. From ae9d7e1f67e0a254b8772ae37bec66b3cbd6924e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Thu, 3 Apr 2025 09:37:32 +0200 Subject: [PATCH 04/16] Expose Parquet operators to the Python API --- python/src/pywy/basic/data/record.py | 38 +++++---- python/src/pywy/core/core.py | 10 +-- python/src/pywy/core/serializer.py | 31 ++++---- python/src/pywy/dataquanta.py | 77 +++++++++++-------- python/src/pywy/execution/util.py | 1 - python/src/pywy/execution/worker.py | 17 ++-- python/src/pywy/graph/graph.py | 22 +++--- python/src/pywy/operators/__init__.py | 40 +++++----- python/src/pywy/operators/base.py | 24 +++--- python/src/pywy/operators/binary.py | 65 ++++++++++------ python/src/pywy/operators/sink.py | 4 +- python/src/pywy/operators/source.py | 13 +++- python/src/pywy/operators/unary.py | 77 +++++++++++-------- python/src/pywy/tests/filter_test.py | 4 +- python/src/pywy/tests/test.py | 3 +- python/src/pywy/types.py | 70 +++++++++++++---- .../main/scala/builder/JsonPlanBuilder.scala | 14 ++-- .../OperatorFromDrawflowConverter.scala | 14 ++-- .../operatorfromjson/ContextFromJson.scala | 2 +- .../operatorfromjson/OperatorFromJson.scala | 33 ++++---- .../binary/IntersectOperatorFromJson.scala | 1 - .../binary/JoinOperatorFromJson.scala | 3 +- .../binary/UnionOperatorFromJson.scala | 2 - .../input/ParquetInputFromJson.scala | 2 +- .../unary/DistinctOperatorFromJson.scala | 1 - .../unary/FilterOperatorFromJson.scala | 3 +- ...on.scala => GroupByOperatorFromJson.scala} | 6 +- .../unary/SortOperatorFromJson.scala | 3 +- .../WrappedTransformationDescriptor.java | 8 +- .../wayang-api-python-defaults.properties | 7 +- .../apache/wayang/api/util/NDimArray.scala | 3 + .../org/apache/wayang/basic/data/Record.java | 4 +- 32 files changed, 353 insertions(+), 249 deletions(-) rename wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/{GroupByOpeartorFromJson.scala => GroupByOperatorFromJson.scala} (90%) diff --git a/python/src/pywy/basic/data/record.py b/python/src/pywy/basic/data/record.py index 55358808a..199063aa1 100644 --- a/python/src/pywy/basic/data/record.py +++ b/python/src/pywy/basic/data/record.py @@ -1,33 +1,45 @@ -from typing import List, cast - -from python.src.pywy.types import GenericTco, ListT, T +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any, List class Record: """ A Type that represents a record with a schema. """ - values: ListT + values: List[Any] - def __init__(self, values: ListT) -> None: + def __init__(self, values: List[Any]) -> None: self.values = values def copy(self) -> 'Record': return Record(self.values.copy()) - def equals(self, o: T) -> bool: - if self == 0: - return True + def equals(self, o: Any) -> bool: if o is None or type(self) != type(o): return False - record_2 = cast(Record, o) return self.values == o.values def hash_code(self) -> int: return hash(self.values) - def get_field(self, index: int) -> T: + def get_field(self, index: int) -> Any: return self.values[index] def get_double(self, index: int) -> float: @@ -39,17 +51,17 @@ def get_int(self, index: int) -> int: def get_string(self, index: int) -> str: return str(self.values[index]) - def set_field(self, index: int, field: T): + def set_field(self, index: int, field: Any) -> None: self.values[index] = field - def addField(self, field: T): + def add_field(self, field: Any) -> None: self.values.append(field) def size(self) -> int: return len(self.values) def __str__(self): - return "Record" + str(self.values) + return str(self.values).replace("[", "").replace("]", "").replace(" ", "") def __repr__(self): return self.__str__() diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py index 74cf008a8..7e346baf5 100644 --- a/python/src/pywy/core/core.py +++ b/python/src/pywy/core/core.py @@ -16,18 +16,13 @@ from typing import Set, Iterable, Dict import json -import base64 -import cloudpickle import requests -import subprocess -import time -import os from pywy.core.platform import Platform from pywy.core.serializer import JSONSerializer from pywy.graph.graph import WayangGraph -from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec -from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator +from pywy.graph.types import WGraphOfVec +from pywy.operators import SinkOperator, UnaryToUnaryOperator class Plugin: @@ -121,7 +116,6 @@ def execute(self): for node in nodes: operator = node.current[0] - if isinstance(operator, UnaryToUnaryOperator): pipeline.append(operator) else: diff --git a/python/src/pywy/core/serializer.py b/python/src/pywy/core/serializer.py index ede6eab7b..4e3d21866 100644 --- a/python/src/pywy/core/serializer.py +++ b/python/src/pywy/core/serializer.py @@ -30,6 +30,7 @@ from pywy.types import get_java_type, NDimArray, ndim_from_type from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator + class JSONSerializer: id_table: Iterable[int] @@ -53,44 +54,42 @@ def serialize(self, operator): json_operator["data"] = {} - if hasattr(operator, "input_type"): - if operator.input_type is not None: - json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json() - if hasattr(operator, "output_type"): - if operator.output_type is not None: - json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json() + if hasattr(operator, "input_type") and operator.input_type is not None: + json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json() + + if hasattr(operator, "output_type") and operator.output_type is not None: + json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json() if operator.json_name == "filter": json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.use_predicate)).decode('utf-8') - return json_operator - if operator.json_name == "reduceBy": + elif operator.json_name == "reduceBy": json_operator["data"]["keyUdf"] = base64.b64encode(cloudpickle.dumps(operator.key_function)).decode('utf-8') json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.reduce_function)).decode('utf-8') - return json_operator elif operator.json_name == "join": - json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.this_key_function)).decode('utf-8') - json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.that_key_function)).decode('utf-8') + json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.get_left_key_udf)).decode('utf-8') + json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.get_right_key_udf)).decode('utf-8') + + elif operator.json_name == "cartesian": + del json_operator["data"] - return json_operator elif operator.json_name == "dlTraining": json_operator["data"]["model"] = {"modelType": "DLModel", "op": operator.model.get_out().to_dict()} json_operator["data"]["option"] = operator.option.to_dict() - return json_operator else: if hasattr(operator, "get_udf"): json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.get_udf)).decode('utf-8') if hasattr(operator, "path"): - json_operator["data"]["filename"] = operator.path + json_operator["data"]["filename"] = operator.path if hasattr(operator, "projection"): json_operator["data"]["projection"] = operator.projection if hasattr(operator, "column_names"): - json_operator["data"]["projection"] = operator.column_names + json_operator["data"]["column_names"] = operator.column_names return json_operator @@ -103,7 +102,7 @@ def serialize_pipeline(self, pipeline): json_operator["cat"] = "unary" json_operator["data"] = {} json_operator["input"] = list(map(lambda x: self.id_table[x], pipeline[0].inputOperator)) - json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[len(pipeline) - 1].outputOperator)) + json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[-1].outputOperator)) if len(pipeline) == 1: return self.serialize(pipeline[0]) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 0c88dd2f1..99a545afe 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -21,11 +21,10 @@ from pywy.operators.base import PO_T from pywy.types import (GenericTco, Predicate, Function, BiFunction, FlatmapFunction, IterableOut, T, In, Out) from pywy.operators import * +from pywy.basic.data.record import Record from pywy.basic.model.option import Option from pywy.basic.model.models import Model -from python.src.pywy.basic.data.record import Record - class Configuration: entries: Dict[str, str] @@ -51,7 +50,6 @@ def __init__(self, configuration: Configuration = Configuration()): """ add a :class:`Plugin` to the :class:`Context` """ - def register(self, *plugins: Plugin): for p in plugins: self.plugins.update(p) @@ -60,18 +58,17 @@ def register(self, *plugins: Plugin): """ remove a :class:`Plugin` from the :class:`Context` """ - def unregister(self, *plugins: Plugin): for p in plugins: self.plugins.remove(p) return self - def textfile(self, file_path: str) -> 'DataQuanta[str]': + def textfile(self, file_path: str) -> "DataQuanta[str]": return DataQuanta(self, TextFileSource(file_path)) def parquet( self, file_path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None - ) -> 'DataQuanta[Record]': + ) -> "DataQuanta[Record]": return DataQuanta(self, ParquetSource(file_path, projection, column_names)) def __str__(self): @@ -94,25 +91,31 @@ def __init__(self, context: WayangContext, operator: PywyOperator): def filter(self: "DataQuanta[T]", p: Predicate, input_type: GenericTco = None) -> "DataQuanta[T]": return DataQuanta(self.context, self._connect(FilterOperator(p, input_type))) - def map(self: "DataQuanta[In]", f: Function, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[Out]": + def map( + self: "DataQuanta[In]", + f: Function, + input_type: GenericTco = None, + output_type: GenericTco = None + ) -> "DataQuanta[Out]": return DataQuanta(self.context, self._connect(MapOperator(f, input_type, output_type))) - def flatmap(self: "DataQuanta[In]", f: FlatmapFunction, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[IterableOut]": + def flatmap( + self: "DataQuanta[In]", + f: FlatmapFunction, + input_type: GenericTco = None, + output_type: GenericTco = None + ) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(FlatmapOperator(f, input_type, output_type))) - def reduce_by_key(self: "DataQuanta[In]", - key_f: Function, - f: BiFunction, - input_type: GenericTco = None - ) -> "DataQuanta[IterableOut]": - + def reduce_by_key( + self: "DataQuanta[In]", + key_f: Function, + f: BiFunction, + input_type: GenericTco = None + ) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(ReduceByKeyOperator(key_f, f, input_type))) - def sort(self: "DataQuanta[In]", - key_f: Function, - input_type: GenericTco = None - ) -> "DataQuanta[IterableOut]": - + def sort(self: "DataQuanta[In]", key_f: Function, input_type: GenericTco = None) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(SortOperator(key_f, input_type))) def join( @@ -121,23 +124,39 @@ def join( that: "DataQuanta[In]", that_key_f: Function, input_type: GenericTco = None, - output_type: GenericTco = None - ) -> "DataQuanta[Out]": - + ) -> "DataQuanta[Out]": op = JoinOperator( this_key_f, that, that_key_f, - input_type, - output_type + input_type + ) + + self._connect(op), + return DataQuanta( + self.context, + that._connect(op, 1) + ) + + def cartesian( + self: "DataQuanta[In]", + that: "DataQuanta[In]", + input_type: GenericTco = None, + ) -> "DataQuanta[Out]": + op = CartesianOperator( + that, + input_type ) self._connect(op), return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) ) + def distinct(self: "DataQuanta[In]", input_type: GenericTco = None) -> "DataQuanta[IterableOut]": + return DataQuanta(self.context, self._connect(DistinctOperator(input_type))) + def dlTraining( self: "DataQuanta[In]", model: Model, @@ -146,7 +165,6 @@ def dlTraining( input_type: GenericTco, output_type: GenericTco ) -> "DataQuanta[Out]": - op = DLTrainingOperator( model, option, @@ -157,7 +175,7 @@ def dlTraining( return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) ) def predict( @@ -175,10 +193,10 @@ def predict( return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) ) - def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None): + def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None) -> None: last: List[SinkOperator] = [ cast( SinkOperator, @@ -190,7 +208,6 @@ def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = N ) ) ] - #print(PywyPlan(self.context.plugins, last)) PywyPlan(self.context.plugins, self.context.configuration.entries, last).execute() def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator: diff --git a/python/src/pywy/execution/util.py b/python/src/pywy/execution/util.py index 3fd3e2e1b..32c7a5b96 100644 --- a/python/src/pywy/execution/util.py +++ b/python/src/pywy/execution/util.py @@ -23,4 +23,3 @@ class SpecialLengths(object): END_OF_STREAM = -4 NULL = -5 START_ARROW_STREAM = -6 - diff --git a/python/src/pywy/execution/worker.py b/python/src/pywy/execution/worker.py index ed94d45a6..44b5941b8 100755 --- a/python/src/pywy/execution/worker.py +++ b/python/src/pywy/execution/worker.py @@ -15,20 +15,16 @@ # limitations under the License. # +import base64 import os +import pickle import socket import struct -import base64 -import re -import sys -import ast -import copy -import pickle -import cloudpickle -import numpy as np +import numpy as np from pywy.execution.util import SpecialLengths + def read_int(stream): length = stream.read(4) if not length: @@ -51,6 +47,7 @@ def loads(self, stream): raise EOFError elif length == SpecialLengths.NULL: return None + s = stream.read(length) return s.decode("utf-8") if self.use_unicode else s @@ -100,6 +97,7 @@ def dump_stream(iterator, stream): ## write_with_length(obj, stream) write_int(SpecialLengths.END_OF_DATA_SECTION, stream) + def process(infile, outfile): udf_length = read_int(infile) serialized_udf = infile.read(udf_length) @@ -109,6 +107,7 @@ def process(infile, outfile): out_iter = func(iterator) dump_stream(iterator=out_iter, stream=outfile) + def local_connect(port): sock = None errors = [] @@ -118,7 +117,7 @@ def local_connect(port): af, socktype, proto, _, sa = res try: sock = socket.socket(af, socktype, proto) - sock.settimeout(30) + sock.settimeout(60) sock.connect(sa) sockfile = sock.makefile("rwb", 65536) return (sockfile, sock) diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index bc9d6682e..2bcd87647 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -50,13 +50,15 @@ def wrap(op: T) -> Optional['GraphNode[K, T]'] | None: return map(wrap, adjacent) - def visit(self, - parent: 'GraphNode[K, T]', - udf: Callable[['GraphNode[K, T]', 'GraphNode[K, T]'], Any], - visit_status: bool = True): + def visit( + self, + parent: 'GraphNode[K, T]', + udf: Callable[['GraphNode[K, T]', 'GraphNode[K, T]'], Any], + visit_status: bool = True + ): if self.visited == visit_status: return - self.visited = ~ visit_status + self.visited = ~visit_status return udf(self, parent) @@ -77,11 +79,11 @@ def build_node(self, t: T) -> GraphNode[K, T]: pass def traversal( - self, - nodes: Iterable[GraphNode[K, T]], - udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any], - origin: Optional[GraphNode[K, T]] = None, - visit_status: bool = True + self, + nodes: Iterable[GraphNode[K, T]], + udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any], + origin: Optional[GraphNode[K, T]] = None, + visit_status: bool = True ): for node in nodes: adjacent = node.walk(self.created_nodes) diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py index 66c0ccdc0..fd4dc2c90 100644 --- a/python/src/pywy/operators/__init__.py +++ b/python/src/pywy/operators/__init__.py @@ -16,26 +16,30 @@ # from pywy.operators.base import PywyOperator +from pywy.operators.binary import BinaryToUnaryOperator, JoinOperator, DLTrainingOperator, PredictOperator, \ + CartesianOperator from pywy.operators.sink import TextFileSink, SinkOperator from pywy.operators.source import TextFileSource, ParquetSource, SourceUnaryOperator -from pywy.operators.unary import UnaryToUnaryOperator, FilterOperator, MapOperator, FlatmapOperator, ReduceByKeyOperator, SortOperator -from pywy.operators.binary import BinaryToUnaryOperator, JoinOperator, DLTrainingOperator, PredictOperator +from pywy.operators.unary import UnaryToUnaryOperator, FilterOperator, MapOperator, FlatmapOperator, \ + ReduceByKeyOperator, SortOperator, DistinctOperator __ALL__ = [ - PywyOperator, - UnaryToUnaryOperator, - BinaryToUnaryOperator, - TextFileSink, - TextFileSource, - ParquetSource, - FilterOperator, - SinkOperator, - SortOperator, - SourceUnaryOperator, - MapOperator, - ReduceByKeyOperator, - FlatmapOperator, - JoinOperator, - DLTrainingOperator, - PredictOperator + PywyOperator, + UnaryToUnaryOperator, + BinaryToUnaryOperator, + TextFileSink, + TextFileSource, + ParquetSource, + FilterOperator, + SinkOperator, + SortOperator, + DistinctOperator, + SourceUnaryOperator, + MapOperator, + ReduceByKeyOperator, + FlatmapOperator, + JoinOperator, + CartesianOperator, + DLTrainingOperator, + PredictOperator, ] diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 3c863337c..5d047635f 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -16,7 +16,7 @@ # from typing import (TypeVar, Optional, List) -from pywy.types import (typecheck, ConstrainedOperatorType) +from pywy.types import (typecheck, ConstrainedOperatorType, serialize_iterator_to_operator_type) class PywyOperator: @@ -29,15 +29,15 @@ class PywyOperator: output_type: ConstrainedOperatorType def __init__(self, - name: str, - cat: str, - input_type: ConstrainedOperatorType = None, - output_type: ConstrainedOperatorType = None, - input_length: Optional[int] = 1, - output_length: Optional[int] = 1, - *args, - **kwargs - ): + name: str, + cat: str, + input_type: ConstrainedOperatorType = None, + output_type: ConstrainedOperatorType = None, + input_length: Optional[int] = 1, + output_length: Optional[int] = 1, + *args, + **kwargs + ): typecheck(input_type) typecheck(output_type) self.name = (self.prefix() + name + self.postfix()).strip() @@ -49,6 +49,9 @@ def __init__(self, self.input_type = input_type self.output_type = output_type + def serialize_iterator(self, iterator): + return serialize_iterator_to_operator_type(self.input_type, iterator) + def validate_inputs(self, vec): if len(vec) != self.inputs: raise Exception( @@ -92,4 +95,5 @@ def __str__(self): def __repr__(self): return self.__str__() + PO_T = TypeVar('PO_T', bound=PywyOperator) diff --git a/python/src/pywy/operators/binary.py b/python/src/pywy/operators/binary.py index 3141f59c7..b3be04d4e 100644 --- a/python/src/pywy/operators/binary.py +++ b/python/src/pywy/operators/binary.py @@ -15,24 +15,14 @@ # limitations under the License. # -from itertools import chain, groupby -from collections import defaultdict -import ast - -from pywy.operators.base import PywyOperator from pywy.basic.model.models import Model from pywy.basic.model.option import Option +from pywy.operators.base import PywyOperator from pywy.types import ( - GenericTco, - GenericUco, - Predicate, - get_type_predicate, - Function, - BiFunction, - get_type_function, - FlatmapFunction, - get_type_flatmap_function - ) + GenericTco, + Function +) + class BinaryToUnaryOperator(PywyOperator): @@ -48,6 +38,7 @@ def __str__(self): def __repr__(self): return super().__repr__() + class JoinOperator(BinaryToUnaryOperator): this_key_function: Function that: PywyOperator @@ -55,19 +46,43 @@ class JoinOperator(BinaryToUnaryOperator): json_name: str def __init__( - self, - this_key_function: Function, - that: PywyOperator, - that_key_function: Function, - input_type: GenericTco, - output_type: GenericTco - ): - super().__init__("Join", input_type, output_type) - self.this_key_function = lambda g: this_key_function(next(g)) + self, + this_key_function: Function, + that: PywyOperator, + that_key_function: Function, + input_type: GenericTco, + ): + super().__init__("Join", input_type, (input_type, input_type)) + self.this_key_function = this_key_function self.that = that - self.that_key_function = lambda g: that_key_function(next(g)) + self.that_key_function = that_key_function self.json_name = "join" + def get_left_key_udf(self, iterator): + iterator = self.serialize_iterator(iterator) + return map(lambda x: self.this_key_function(x), iterator) + + def get_right_key_udf(self, iterator): + iterator = self.serialize_iterator(iterator) + print("right") + print(iterator) + print(list(map(lambda x: self.that_key_function(x), iterator))) + return map(lambda x: self.that_key_function(x), iterator) + + +class CartesianOperator(BinaryToUnaryOperator): + that: PywyOperator + json_name: str + + def __init__( + self, + that: PywyOperator, + input_type: GenericTco, + ): + super().__init__("Cartesian", input_type, input_type) + self.that = that + self.json_name = "cartesian" + class DLTrainingOperator(BinaryToUnaryOperator): model: Model diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py index b3fdbaba6..371a5ed8f 100644 --- a/python/src/pywy/operators/sink.py +++ b/python/src/pywy/operators/sink.py @@ -23,7 +23,7 @@ class SinkOperator(PywyOperator): def postfix(self) -> str: - return 'Sink' + return "Sink" class SinkUnaryOperator(SinkOperator): @@ -43,7 +43,7 @@ class TextFileSink(SinkUnaryOperator): json_name: str def __init__(self, path: str, input_type: GenericTco): - super().__init__('TextFile', input_type) + super().__init__("TextFile", input_type) self.path = path self.json_name = "textFileOutput" diff --git a/python/src/pywy/operators/source.py b/python/src/pywy/operators/source.py index 0c8d9a13a..39e11bfbc 100644 --- a/python/src/pywy/operators/source.py +++ b/python/src/pywy/operators/source.py @@ -19,14 +19,18 @@ from pywy.operators.base import PywyOperator +from pywy.basic.data.record import Record + +from pywy.types import GenericTco + class SourceUnaryOperator(PywyOperator): - def __init__(self, name: str): + def __init__(self, name: str, output_type: GenericTco): super(SourceUnaryOperator, self).__init__( name=name, input_type=None, - output_type=str, + output_type=output_type, cat="input", input_length=0, output_length=1 @@ -47,7 +51,7 @@ class TextFileSource(SourceUnaryOperator): json_name: str def __init__(self, path: str): - super(TextFileSource, self).__init__('TextFile') + super(TextFileSource, self).__init__('TextFile', output_type=str) self.path = path self.json_name = "textFileInput" @@ -57,6 +61,7 @@ def __str__(self): def __repr__(self): return super().__repr__() + class ParquetSource(SourceUnaryOperator): path: str projection: Optional[List[str]] @@ -64,7 +69,7 @@ class ParquetSource(SourceUnaryOperator): json_name: str def __init__(self, path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None): - super(ParquetSource, self).__init__('Parquet') + super(ParquetSource, self).__init__('Parquet', output_type=Record) self.path = path self.projection = projection self.column_names = column_names diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index fd736df6b..7e2f286e5 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -16,22 +16,19 @@ # from itertools import chain, groupby -from collections import defaultdict -import ast from pywy.operators.base import PywyOperator from pywy.types import ( - GenericTco, - GenericUco, - Predicate, - get_type_predicate, - Function, - BiFunction, - get_type_function, - get_type_bifunction, - FlatmapFunction, - get_type_flatmap_function - ) + GenericTco, + Predicate, + get_type_predicate, + Function, + BiFunction, + get_type_function, + get_type_bifunction, + FlatmapFunction, + get_type_flatmap_function +) class UnaryToUnaryOperator(PywyOperator): @@ -40,7 +37,7 @@ def __init__(self, name: str, input_type: GenericTco = None, output_type: Generi super().__init__(name, "unary", input_type, output_type, 1, 1) def postfix(self) -> str: - return 'OperatorUnary' + return "OperatorUnary" def __str__(self): return super().__str__() @@ -50,7 +47,6 @@ def __repr__(self): class FilterOperator(UnaryToUnaryOperator): - predicate: Predicate json_name: str @@ -65,6 +61,7 @@ def use_predicate(self, iterator) -> bool: return self.predicate(next(iterator)) def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) return filter(self.predicate, iterator) def __str__(self): @@ -75,7 +72,6 @@ def __repr__(self): class MapOperator(UnaryToUnaryOperator): - function: Function json_name: str @@ -87,6 +83,7 @@ def __init__(self, function: Function, input_type: GenericTco = None, output_typ self.json_name = "map" def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) return map(lambda x: self.function(x), iterator) def __str__(self): @@ -95,8 +92,8 @@ def __str__(self): def __repr__(self): return super().__repr__() -class MapPartitionsOperator(UnaryToUnaryOperator): +class MapPartitionsOperator(UnaryToUnaryOperator): function: Function json_name: str @@ -108,6 +105,7 @@ def __init__(self, function: Function, input_type: GenericTco = None, output_typ self.json_name = "mapPartitions" def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) return map(lambda x: self.function(x), iterator) def __str__(self): @@ -118,11 +116,9 @@ def __repr__(self): class FlatmapOperator(UnaryToUnaryOperator): - fm_function: FlatmapFunction json_name: str - def __init__(self, fm_function: FlatmapFunction, input_type: GenericTco = None, output_type: GenericTco = None): if input_type is None or output_type is None: input_type, output_type = get_type_flatmap_function(fm_function) if fm_function else (None, None) @@ -131,6 +127,7 @@ def __init__(self, fm_function: FlatmapFunction, input_type: GenericTco = None, self.json_name = "flatMap" def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) return chain.from_iterable(map(lambda x: self.fm_function(x), iterator)) def __str__(self): @@ -139,17 +136,18 @@ def __str__(self): def __repr__(self): return super().__repr__() + class ReduceByKeyOperator(UnaryToUnaryOperator): key_function: Function reduce_function: BiFunction json_name: str def __init__( - self, - key_function: Function, - reduce_function: BiFunction, - input_type: GenericTco = None, - ): + self, + key_function: Function, + reduce_function: BiFunction, + input_type: GenericTco = None, + ): if input_type is None: input_type = get_type_bifunction(reduce_function) if reduce_function else (None, None, None) super().__init__("ReduceByKey", (input_type[0], input_type[1])) @@ -158,14 +156,9 @@ def __init__( self.json_name = "reduceBy" def get_udf(self, iterator): - # Use ast.literal_eval() to safely evaluate the string as a Python literal - #print(", ".join(iterator)) - #list_of_tuples = ast.literal_eval("[" + ", ".join(iterator) + "]") - - tuples = [(str(item[0]), str(item[1])) for item in iterator] - grouped_data = groupby(sorted(tuples, key=self.key_function), key=self.key_function) + iterator = self.serialize_iterator(iterator) + grouped_data = groupby(sorted(iterator, key=self.key_function), key=self.key_function) - # Create a defaultdict to store the sums sums = {} for key, group in grouped_data: @@ -183,18 +176,18 @@ def __repr__(self): class SortOperator(UnaryToUnaryOperator): - key_udf: Function json_name: str def __init__(self, function: Function, input_type: GenericTco = None): if input_type is None: input_type, output_type = get_type_function(function) if function else (None, None) - super().__init__("Sort", input_type, None) + super().__init__("Sort", input_type, input_type) self.key_udf = function self.json_name = "sort" def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) return sorted(iterator, key=self.key_udf) def __str__(self): @@ -203,3 +196,21 @@ def __str__(self): def __repr__(self): return super().__repr__() + +class DistinctOperator(UnaryToUnaryOperator): + json_name: str + + def __init__(self, input_type: GenericTco = None): + super().__init__("Distinct", input_type, input_type) + self.json_name = "distinct" + + def get_udf(self, iterator): + iterator = self.serialize_iterator(iterator) + return iter(set(iterator)) + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() + diff --git a/python/src/pywy/tests/filter_test.py b/python/src/pywy/tests/filter_test.py index aa04f0796..82e8812db 100644 --- a/python/src/pywy/tests/filter_test.py +++ b/python/src/pywy/tests/filter_test.py @@ -16,9 +16,7 @@ # import unittest -#from typing import Tuple, Callable, Iterable from pywy.dataquanta import WayangContext -from unittest.mock import Mock from pywy.platforms.java import JavaPlugin from pywy.platforms.spark import SparkPlugin @@ -33,7 +31,7 @@ def test_to_json(self): right = ctx.textfile("file:///var/www/html/README.md") \ .filter(lambda w: "Wayang" in w, str) \ .map(lambda w: (len(w), w), str, (int, str)) - join = left.join(lambda w: w[0], right, lambda w: w[0], (int, str), ((int, str), (int, str))) \ + join = left.join(lambda w: w[0], right, lambda w: w[0], (int, str)) \ .store_textfile("file:///var/www/html/data/wordcount-out-python.txt") self.assertEqual(True, True) diff --git a/python/src/pywy/tests/test.py b/python/src/pywy/tests/test.py index 2cc0af22e..8fbbcfed2 100644 --- a/python/src/pywy/tests/test.py +++ b/python/src/pywy/tests/test.py @@ -16,9 +16,8 @@ # import unittest -from typing import Tuple, Callable, Iterable, List +from typing import List from pywy.dataquanta import WayangContext -from unittest.mock import Mock from pywy.platforms.java import JavaPlugin from pywy.platforms.spark import SparkPlugin from pywy.platforms.tensorflow import TensorflowPlugin diff --git a/python/src/pywy/types.py b/python/src/pywy/types.py index 47ccc3763..2c1d6721d 100644 --- a/python/src/pywy/types.py +++ b/python/src/pywy/types.py @@ -15,11 +15,14 @@ # limitations under the License. # -from typing import (Generic, TypeVar, Callable, Hashable, Iterable, Type, Union, Tuple, get_args, get_origin, List, Dict, Any) +import re +from ast import literal_eval from inspect import signature +from typing import ( + Generic, TypeVar, Callable, Hashable, Iterable, Type, Union, Tuple, get_args, get_origin, List, Dict, Any +) from numpy import int32, int64, float32, float64, ndarray -import re - +from pywy.basic.data.record import Record from pywy.exception import PywyException T = TypeVar("T") # Type @@ -45,7 +48,7 @@ "NumberOrArray", float, int, complex, int32, int64, float32, float64, ndarray ) -ConstrainedOperatorType = Union[PrimitiveType, NumberOrArray, IterableT, ListT] +ConstrainedOperatorType = Union[PrimitiveType, NumberOrArray, IterableT, ListT, Record] Predicate = Callable[[ConstrainedOperatorType], bool] Function = Callable[[ConstrainedOperatorType], ConstrainedOperatorType] @@ -70,6 +73,8 @@ 1 {origin: int, depth: 0} """ + + class NDimArray: origin: Type depth: int @@ -84,6 +89,7 @@ def __str__(self) -> str: def to_json(self) -> dict: return {"origin": get_java_type(self.origin), "depth": self.depth} + def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArray: # Handle basic types and direct typing module classes if hasattr(py_type, '__name__'): @@ -102,7 +108,8 @@ def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArra return NDimArray(py_type, depth) -#Define the mappings + +# Define the mappings type_mappings: Dict[Type, str] = { 'int': 'Integer', 'float': 'Float', @@ -114,9 +121,11 @@ def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArra 'Dict': 'Map', 'tuple': 'Tuple', 'Tuple': 'Tuple', - 'Any': 'Object' + 'Any': 'Object', + 'Record': 'Record', } + def get_type_predicate(call: Predicate) -> type: sig = signature(call) if len(sig.parameters) != 1: @@ -176,19 +185,15 @@ def get_type_flatmap_function(call: FlatmapFunction) -> (type, type): keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation, sig.return_annotation.__args__[0] + def typecheck(input_type: Type[ConstrainedOperatorType]): allowed_types = get_args(ConstrainedOperatorType) - print(allowed_types) - print(input_type) if input_type in allowed_types or input_type is None: return origin = get_origin(input_type) args = get_args(input_type) - print(origin) - print(args) - if isinstance(input_type, List) and args: typecheck(args[0]) elif isinstance(input_type, Tuple): @@ -197,10 +202,7 @@ def typecheck(input_type: Type[ConstrainedOperatorType]): else: raise TypeError(f"Unsupported Operator type: {input_type}") else: - #print(get_args(ConstrainedOperatorType)) - #if candT not in get_args(ConstrainedOperatorType) and candT is not None: - #if candT is not PrimitiveType and candT is not IterableT and candT is not NumberOrArray and candT is not None: - raise TypeError(f"Unsupported Operator type: {input_type}, {args}, {isinstance(input_type, Tuple)}") + raise TypeError(f"Unsupported Operator type: {input_type}, {origin}, {args}") def get_java_type(input_type: ConstrainedOperatorType) -> str: str_type = get_type_str(input_type) @@ -208,6 +210,7 @@ def get_java_type(input_type: ConstrainedOperatorType) -> str: py_type = str_type.replace("typing.", "") return convert_type(py_type) + def convert_type(py_type: str) -> str: # Regex to find generic types like List[float], Dict[str, int], etc. generic_type_pattern = re.compile(r'(\w+)\[(.+)\]') @@ -221,6 +224,7 @@ def convert_type(py_type: str) -> str: else: return type_mappings.get(py_type, py_type) + def get_type_str(py_type: Any) -> str: # Handle basic types and direct typing module classes if hasattr(py_type, '__name__'): @@ -238,3 +242,39 @@ def get_type_str(py_type: Any) -> str: return origin_str return str(py_type) + + +def serialize_iterator_to_operator_type( + operator_type: ConstrainedOperatorType, iterator: Iterable[Union[str, ConstrainedOperatorType]] +) -> Iterable[ConstrainedOperatorType]: + def _cast_value_to_type(value: Any, type: ConstrainedOperatorType): + if type is ndarray: + return ndarray(value) + elif type is IterableT: + return iter(value) + elif type is Record: + return Record(list(value)) + else: + return value + + if operator_type is str: + return iterator + + typed_iterator = list() + for x in iterator: + if not isinstance(x, str): + typed_iterator.append(x) + continue + + x = literal_eval(x) + + if isinstance(operator_type, Tuple): + args = get_args(operator_type) + for i, arg_type in enumerate(args): + x[i] = _cast_value_to_type(value=x[i], type=arg_type) + else: + x = _cast_value_to_type(x, operator_type) + + typed_iterator.append(x) + + return iter(typed_iterator) diff --git a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala index 27cb32d25..13cfa2fa8 100644 --- a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala +++ b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala @@ -25,7 +25,7 @@ import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TableInputFromJson, TextFileInputFromJson, ParquetInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, ReduceOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOperatorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, ReduceOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.PlanFromJson import org.apache.wayang.api._ import org.apache.wayang.basic.operators._ @@ -44,6 +44,7 @@ import org.apache.wayang.genericjdbc.GenericJdbc import org.apache.wayang.sqlite3.Sqlite3 import org.apache.wayang.core.plugin.Plugin import org.apache.wayang.basic.model.DLModel; +import org.apache.wayang.basic.data.Record; import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ @@ -167,7 +168,7 @@ class JsonPlanBuilder() { case operator: FlatMapOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: ReduceByOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: CountOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) - case operator: GroupByOpeartorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) + case operator: GroupByOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: SortOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: DistinctOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: ReduceOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) @@ -190,6 +191,8 @@ class JsonPlanBuilder() { // Other case operator: KMeansFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) + + // TODO: case operator: CollectSinkOperator => return dataquanta of last operator! } } @@ -330,7 +333,7 @@ class JsonPlanBuilder() { dataQuanta.count.asInstanceOf[DataQuanta[Any]].withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } - private def visit(operator: GroupByOpeartorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { + private def visit(operator: GroupByOperatorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { val lambda = SerializableLambda.createLambda[Any, Any](operator.data.keyUdf) if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) dataQuanta.groupByKey(lambda).map(arrayList => arrayList.asScala.toList) @@ -348,9 +351,10 @@ class JsonPlanBuilder() { } private def visit(operator: DistinctOperatorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { - if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) + if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) { + print("hello") dataQuanta.distinct - else + } else dataQuanta.distinct.withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala index f93eb0e47..754d161d0 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala @@ -20,10 +20,10 @@ package org.apache.wayang.api.json.operatorfromdrawflow import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, UnionOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.binary._ import org.apache.wayang.api.json.operatorfromjson.input._ import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output._ +import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson import org.apache.wayang.api.json.operatorfromjson.unary._ import org.apache.wayang.api.json.parserutil.ParseOperatorsFromDrawflow @@ -93,11 +93,11 @@ object OperatorFromDrawflowConverter { )) // unary - case "filter" => FilterOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Filter, FilterOperatorFromJson.Data(operatorFromDrawflow.data("booleanFunction").asInstanceOf[String]), executionPlatform) + case "filter" => FilterOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Filter, FilterOperatorFromJson.Data(operatorFromDrawflow.data("booleanFunction").asInstanceOf[String], None, None), executionPlatform) case "reduceBy" => ReduceByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ReduceBy, ReduceByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], operatorFromDrawflow.data("reduceFunction").asInstanceOf[String]), executionPlatform) case "count" => CountOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Count, executionPlatform) - case "groupBy" => GroupByOpeartorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.GroupBy, GroupByOpeartorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) - case "sort" => SortOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sort, SortOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) + case "groupBy" => GroupByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.GroupBy, GroupByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) + case "sort" => SortOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sort, SortOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], None, None), executionPlatform) case "flatMap" => FlatMapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.FlatMap, FlatMapOperatorFromJson.Data(operatorFromDrawflow.data("flatMapFunction").asInstanceOf[String], None, None), executionPlatform) case "map" => MapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Map, MapOperatorFromJson.Data(operatorFromDrawflow.data("mapFunction").asInstanceOf[String], None, None), executionPlatform) case "reduce" => ReduceOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Reduce, ReduceOperatorFromJson.Data(operatorFromDrawflow.data("reduceFunction").asInstanceOf[String], None, None), executionPlatform) @@ -109,7 +109,7 @@ object OperatorFromDrawflowConverter { case "union" => UnionOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Union, executionPlatform) case "coGroup" => CoGroupOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.CoGroup, CoGroupOperatorFromJson.Data(operatorFromDrawflow.data("groupKey1").asInstanceOf[String], operatorFromDrawflow.data("groupKey2").asInstanceOf[String]), executionPlatform) case "cartesian" => CartesianOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Cartesian, executionPlatform) - case "join" => JoinOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Join, JoinOperatorFromJson.Data(operatorFromDrawflow.data("joinKey1").asInstanceOf[String], operatorFromDrawflow.data("joinKey2").asInstanceOf[String]), executionPlatform) + case "join" => JoinOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Join, JoinOperatorFromJson.Data(operatorFromDrawflow.data("joinKey1").asInstanceOf[String], operatorFromDrawflow.data("joinKey2").asInstanceOf[String], None, None), executionPlatform) case "intersect" => IntersectOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Intersect, executionPlatform) // loop @@ -120,7 +120,5 @@ object OperatorFromDrawflowConverter { // output case "oTextFile" => TextFileOutputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileOutput, TextFileOutputFromJson.Data(operatorFromDrawflow.data("outputFileURL").asInstanceOf[String]), executionPlatform) } - } - } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala index f4cacfc31..334b80717 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala @@ -29,7 +29,7 @@ import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOperatorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} class ContextFromJson(val platforms: List[String], val origin: String, diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala index 42bcf0be2..58d41b5c1 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala @@ -20,21 +20,20 @@ package org.apache.wayang.api.json.operatorfromjson import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson.OperatorNames -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, DLTrainingOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, UnionOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson, ParquetInputFromJson} -import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.binary._ +import org.apache.wayang.api.json.operatorfromjson.input._ +import org.apache.wayang.api.json.operatorfromjson.loop._ +import org.apache.wayang.api.json.operatorfromjson.other._ +import org.apache.wayang.api.json.operatorfromjson.output._ +import org.apache.wayang.api.json.operatorfromjson.unary._ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "operatorName", visible = true) @JsonSubTypes( Array( - // Input operators new JsonSubTypes.Type(value = classOf[TextFileInputFromJson], name = OperatorNames.TextFileInput), new JsonSubTypes.Type(value = classOf[ParquetInputFromJson], name = OperatorNames.ParquetInput), @@ -51,7 +50,7 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, new JsonSubTypes.Type(value = classOf[FilterOperatorFromJson], name = OperatorNames.Filter), new JsonSubTypes.Type(value = classOf[ReduceByOperatorFromJson], name = OperatorNames.ReduceBy), new JsonSubTypes.Type(value = classOf[CountOperatorFromJson], name = OperatorNames.Count), - new JsonSubTypes.Type(value = classOf[GroupByOpeartorFromJson], name = OperatorNames.GroupBy), + new JsonSubTypes.Type(value = classOf[GroupByOperatorFromJson], name = OperatorNames.GroupBy), new JsonSubTypes.Type(value = classOf[SortOperatorFromJson], name = OperatorNames.Sort), new JsonSubTypes.Type(value = classOf[DistinctOperatorFromJson], name = OperatorNames.Distinct), new JsonSubTypes.Type(value = classOf[ReduceByOperatorFromJson], name = OperatorNames.Reduce), @@ -80,14 +79,14 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, ) ) -class OperatorFromJson(val id: Long, - val input: Array[Long], - val output: Array[Long], - val cat: String, - val operatorName: String, - val executionPlatform: String = null - ) extends Serializable { - +class OperatorFromJson( + val id: Long, + val input: Array[Long], + val output: Array[Long], + val cat: String, + val operatorName: String, + val executionPlatform: String = null +) extends Serializable { // // Because case classes combined with inheritance were kind of difficult to change a field, // we use a workaround with json serialization -> modification -> deserialization. diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala index cc3dd64af..4fb29f184 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala @@ -29,4 +29,3 @@ case class IntersectOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala index b0b905ed4..df288afb8 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.binary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Join) case class JoinOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class JoinOperatorFromJson(override val id: Long, } object JoinOperatorFromJson { - case class Data(thisKeyUdf: String, thatKeyUdf: String) + case class Data(thisKeyUdf: String, thatKeyUdf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala index e4e138b6b..ce04b22f1 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala @@ -29,5 +29,3 @@ case class UnionOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala index b3bffc96e..f663e5728 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala @@ -22,7 +22,7 @@ import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson import com.fasterxml.jackson.annotation.JsonIgnoreProperties @JsonIgnoreProperties(ignoreUnknown = true) -@JsonTypeName(OperatorFromJson.OperatorNames.TextFileInput) +@JsonTypeName(OperatorFromJson.OperatorNames.ParquetInput) case class ParquetInputFromJson(override val id: Long, override val input: Array[Long], override val output: Array[Long], diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala index 316c69551..4ef204507 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala @@ -29,4 +29,3 @@ case class DistinctOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala index 93e49698d..e2fd764c9 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.unary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Filter) case class FilterOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class FilterOperatorFromJson(override val id: Long, } object FilterOperatorFromJson { - case class Data(udf: String) + case class Data(udf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala similarity index 90% rename from wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala rename to wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala index 53db57136..a230a7393 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala @@ -22,17 +22,17 @@ import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson @JsonTypeName(OperatorFromJson.OperatorNames.GroupBy) -case class GroupByOpeartorFromJson(override val id: Long, +case class GroupByOperatorFromJson(override val id: Long, override val input: Array[Long], override val output: Array[Long], override val cat: String, override val operatorName: String, - val data: GroupByOpeartorFromJson.Data, + val data: GroupByOperatorFromJson.Data, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } -object GroupByOpeartorFromJson { +object GroupByOperatorFromJson { case class Data(keyUdf: String) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala index 7ce91c49c..7c7c9d95a 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.unary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Sort) case class SortOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class SortOperatorFromJson(override val id: Long, } object SortOperatorFromJson { - case class Data(keyUdf: String) + case class Data(keyUdf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java index 92c3977cd..534fa8e37 100644 --- a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java @@ -21,7 +21,6 @@ import org.apache.wayang.core.function.TransformationDescriptor; import org.apache.wayang.core.types.BasicDataUnitType; import org.apache.wayang.api.python.executor.PythonWorkerManager; -import org.apache.wayang.core.function.FunctionDescriptor; import com.google.protobuf.ByteString; import java.util.ArrayList; @@ -39,7 +38,12 @@ public WrappedTransformationDescriptor( input.add(item); final PythonWorkerManager manager = new PythonWorkerManager<>(serializedUDF, input); final Iterable output = manager.execute(); - return output.iterator().next(); + + if (output.iterator().hasNext()) { + return output.iterator().next(); + } + + return null; }, inputTypeClass, outputTypeClass diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties index 55c6ce726..f19e71055 100644 --- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties +++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -wayang.api.python.worker = /var/www/html/python/src/pywy/execution/worker.py -wayang.api.python.path = python3 -wayang.api.python.env.path = /usr/local/lib/python3.8/dist-packages - +wayang.api.python.worker = /Users/AURELIEN/Documents/University/Master/Master 2/ITU/Advanced Data Systems/Assignments/Assignment 3/incubator-wayang/python/src/pywy/execution/worker.py +wayang.api.python.path = /usr/local/Caskroom/miniconda/base/envs/wayang/bin/python3 +wayang.api.python.env.path = /usr/local/Caskroom/miniconda/base/envs/wayang/lib/python3.8/site-packages diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala index c20e500bf..9f0c1d037 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala @@ -17,6 +17,8 @@ */ package org.apache.wayang.api.util +import org.apache.wayang.basic.data.Record + import scala.reflect.ClassTag class NDimArray(val origin: String, val depth: Integer) extends Serializable { @@ -42,6 +44,7 @@ class NDimArray(val origin: String, val depth: Integer) extends Serializable { case "Character" => classOf[Char] case "Byte" => classOf[Byte] case "Short" => classOf[Short] + case "Record" => classOf[Record] case _ => classOf[Object] } diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java index 687aaa92c..fc897cab7 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java @@ -61,7 +61,7 @@ public int hashCode() { @Override public String toString() { - return "Record" + Arrays.toString(this.values); + return Arrays.toString(this.values).replace("[", "").replace("]", "").replace(" ", ""); } public Object getField(int index) { @@ -77,7 +77,7 @@ public Object getField(int index) { public double getDouble(int index) { Object field = this.values[index]; return ReflectionUtils.toDouble(field); - } + } /** * Retrieve a field as a {@code long}. It must be castable as such. From eaefe68fe9005c043cf38875e48820b5ed7a1288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Thu, 3 Apr 2025 09:46:36 +0200 Subject: [PATCH 05/16] Add projection to the serializer --- python/src/pywy/core/serializer.py | 6 ++++++ .../src/main/scala/builder/JsonPlanBuilder.scala | 3 --- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/src/pywy/core/serializer.py b/python/src/pywy/core/serializer.py index 4e3d21866..8aa58723e 100644 --- a/python/src/pywy/core/serializer.py +++ b/python/src/pywy/core/serializer.py @@ -91,6 +91,12 @@ def serialize(self, operator): if hasattr(operator, "column_names"): json_operator["data"]["column_names"] = operator.column_names + if hasattr(operator, "projection"): + json_operator["data"]["projection"] = operator.projection + + if hasattr(operator, "column_names"): + json_operator["data"]["projection"] = operator.column_names + return json_operator def serialize_pipeline(self, pipeline): diff --git a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala index 13cfa2fa8..d54c541df 100644 --- a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala +++ b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala @@ -191,8 +191,6 @@ class JsonPlanBuilder() { // Other case operator: KMeansFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) - - // TODO: case operator: CollectSinkOperator => return dataquanta of last operator! } } @@ -352,7 +350,6 @@ class JsonPlanBuilder() { private def visit(operator: DistinctOperatorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) { - print("hello") dataQuanta.distinct } else dataQuanta.distinct.withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) From 789c886c6c5d0cf62a78af36d6fbadaa6f8ca400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 19:41:48 +0200 Subject: [PATCH 06/16] update Record to string methods --- python/src/pywy/basic/data/record.py | 2 +- .../src/main/java/org/apache/wayang/basic/data/Record.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/src/pywy/basic/data/record.py b/python/src/pywy/basic/data/record.py index 199063aa1..3579a140a 100644 --- a/python/src/pywy/basic/data/record.py +++ b/python/src/pywy/basic/data/record.py @@ -61,7 +61,7 @@ def size(self) -> int: return len(self.values) def __str__(self): - return str(self.values).replace("[", "").replace("]", "").replace(" ", "") + return "Record" + str(self.values) def __repr__(self): return self.__str__() diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java index f4efa1f30..ed92e813c 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java @@ -67,9 +67,7 @@ public int hashCode() { } @Override - public String toString() { - return Arrays.toString(this.values).replace("[", "").replace("]", "").replace(" ", ""); - } + public String toString() { return "Record" + Arrays.toString(this.values); } public Object getField(final int index) { return this.values[index]; From 2e227971391bafa6c23eec281a01855551cc3de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 19:49:08 +0200 Subject: [PATCH 07/16] add parquet source operator to Python API --- python/src/pywy/operators/source.py | 6 +----- .../scala/operatorfromjson/input/ParquetInputFromJson.scala | 2 +- .../main/scala/org/apache/wayang/api/JavaPlanBuilder.scala | 3 +-- .../src/main/scala/org/apache/wayang/api/PlanBuilder.scala | 2 +- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/python/src/pywy/operators/source.py b/python/src/pywy/operators/source.py index 39e11bfbc..d55caba44 100644 --- a/python/src/pywy/operators/source.py +++ b/python/src/pywy/operators/source.py @@ -18,9 +18,7 @@ from typing import List, Optional from pywy.operators.base import PywyOperator - from pywy.basic.data.record import Record - from pywy.types import GenericTco @@ -65,14 +63,12 @@ def __repr__(self): class ParquetSource(SourceUnaryOperator): path: str projection: Optional[List[str]] - column_names: Optional[List[str]] json_name: str - def __init__(self, path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None): + def __init__(self, path: str, projection: Optional[List[str]] = None): super(ParquetSource, self).__init__('Parquet', output_type=Record) self.path = path self.projection = projection - self.column_names = column_names self.json_name = "parquetInput" def __str__(self): diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala index f663e5728..d8c498433 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala @@ -36,5 +36,5 @@ case class ParquetInputFromJson(override val id: Long, @JsonIgnoreProperties(ignoreUnknown = true) object ParquetInputFromJson { @JsonIgnoreProperties(ignoreUnknown = true) - case class Data(filename: String, projection: Array[String] = null, columnNames: List[String] = Nil) + case class Data(filename: String, projection: Array[String] = null) } diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala index 69e2a4b48..d1f9a118a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala @@ -31,7 +31,6 @@ import org.apache.wayang.core.api.WayangContext import org.apache.wayang.core.plan.wayangplan._ import org.apache.wayang.core.types.DataSetType -import java.util import scala.reflect.ClassTag /** @@ -70,7 +69,7 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) { * @param projection the projection, if any * @return [[DataQuantaBuilder]] for the file */ - def readParquet(url: String, projection: Array[String]): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = + def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record])) /** diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index f6df3e344..7a7c6e324 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -133,7 +133,7 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job * @param projection the projection, if any * @return [[DataQuanta]] of [[Record]]s representing the file */ - def readParquet(url: String, projection: Array[String]): RecordDataQuanta = load(ParquetSource.create(url, projection)) + def readParquet(url: String, projection: Array[String] = null): RecordDataQuanta = load(ParquetSource.create(url, projection)) /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. From b3025846323063d76112245922a48a6095005c48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 19:56:25 +0200 Subject: [PATCH 08/16] remove print statements --- python/src/pywy/operators/binary.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/src/pywy/operators/binary.py b/python/src/pywy/operators/binary.py index b3be04d4e..2c5be2d8b 100644 --- a/python/src/pywy/operators/binary.py +++ b/python/src/pywy/operators/binary.py @@ -15,6 +15,8 @@ # limitations under the License. # +import ast + from pywy.basic.model.models import Model from pywy.basic.model.option import Option from pywy.operators.base import PywyOperator @@ -53,9 +55,9 @@ def __init__( input_type: GenericTco, ): super().__init__("Join", input_type, (input_type, input_type)) - self.this_key_function = this_key_function + self.this_key_function = lambda g: this_key_function(ast.literal_eval(next(g))) self.that = that - self.that_key_function = that_key_function + self.that_key_function = lambda g: that_key_function(ast.literal_eval(next(g))) self.json_name = "join" def get_left_key_udf(self, iterator): @@ -64,9 +66,6 @@ def get_left_key_udf(self, iterator): def get_right_key_udf(self, iterator): iterator = self.serialize_iterator(iterator) - print("right") - print(iterator) - print(list(map(lambda x: self.that_key_function(x), iterator))) return map(lambda x: self.that_key_function(x), iterator) From 605d27ed7bdb497af5b252da010006e279e8ec14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 19:56:49 +0200 Subject: [PATCH 09/16] remove distinct operator from commit --- python/src/pywy/dataquanta.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 99a545afe..a208cd2f8 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -154,9 +154,6 @@ def cartesian( that._connect(op, 1) ) - def distinct(self: "DataQuanta[In]", input_type: GenericTco = None) -> "DataQuanta[IterableOut]": - return DataQuanta(self.context, self._connect(DistinctOperator(input_type))) - def dlTraining( self: "DataQuanta[In]", model: Model, From 9c86277106ffaa200305a51487279c6123a045bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:01:11 +0200 Subject: [PATCH 10/16] remove serialization of iterator for commit --- python/src/pywy/core/serializer.py | 4 ++-- python/src/pywy/operators/base.py | 7 ++----- python/src/pywy/operators/binary.py | 8 -------- python/src/pywy/operators/unary.py | 7 ------- 4 files changed, 4 insertions(+), 22 deletions(-) diff --git a/python/src/pywy/core/serializer.py b/python/src/pywy/core/serializer.py index 4e3d21866..7e727c4bf 100644 --- a/python/src/pywy/core/serializer.py +++ b/python/src/pywy/core/serializer.py @@ -68,8 +68,8 @@ def serialize(self, operator): json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.reduce_function)).decode('utf-8') elif operator.json_name == "join": - json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.get_left_key_udf)).decode('utf-8') - json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.get_right_key_udf)).decode('utf-8') + json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.this_key_function)).decode('utf-8') + json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.that_key_function)).decode('utf-8') elif operator.json_name == "cartesian": del json_operator["data"] diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 5d047635f..ee57d421c 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -15,8 +15,8 @@ # limitations under the License. # -from typing import (TypeVar, Optional, List) -from pywy.types import (typecheck, ConstrainedOperatorType, serialize_iterator_to_operator_type) +from typing import TypeVar, Optional, List +from pywy.types import typecheck, ConstrainedOperatorType class PywyOperator: @@ -49,9 +49,6 @@ def __init__(self, self.input_type = input_type self.output_type = output_type - def serialize_iterator(self, iterator): - return serialize_iterator_to_operator_type(self.input_type, iterator) - def validate_inputs(self, vec): if len(vec) != self.inputs: raise Exception( diff --git a/python/src/pywy/operators/binary.py b/python/src/pywy/operators/binary.py index 2c5be2d8b..a1a09cbb2 100644 --- a/python/src/pywy/operators/binary.py +++ b/python/src/pywy/operators/binary.py @@ -60,14 +60,6 @@ def __init__( self.that_key_function = lambda g: that_key_function(ast.literal_eval(next(g))) self.json_name = "join" - def get_left_key_udf(self, iterator): - iterator = self.serialize_iterator(iterator) - return map(lambda x: self.this_key_function(x), iterator) - - def get_right_key_udf(self, iterator): - iterator = self.serialize_iterator(iterator) - return map(lambda x: self.that_key_function(x), iterator) - class CartesianOperator(BinaryToUnaryOperator): that: PywyOperator diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index 7e2f286e5..97e568a09 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -61,7 +61,6 @@ def use_predicate(self, iterator) -> bool: return self.predicate(next(iterator)) def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return filter(self.predicate, iterator) def __str__(self): @@ -83,7 +82,6 @@ def __init__(self, function: Function, input_type: GenericTco = None, output_typ self.json_name = "map" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return map(lambda x: self.function(x), iterator) def __str__(self): @@ -105,7 +103,6 @@ def __init__(self, function: Function, input_type: GenericTco = None, output_typ self.json_name = "mapPartitions" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return map(lambda x: self.function(x), iterator) def __str__(self): @@ -127,7 +124,6 @@ def __init__(self, fm_function: FlatmapFunction, input_type: GenericTco = None, self.json_name = "flatMap" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return chain.from_iterable(map(lambda x: self.fm_function(x), iterator)) def __str__(self): @@ -156,7 +152,6 @@ def __init__( self.json_name = "reduceBy" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) grouped_data = groupby(sorted(iterator, key=self.key_function), key=self.key_function) sums = {} @@ -187,7 +182,6 @@ def __init__(self, function: Function, input_type: GenericTco = None): self.json_name = "sort" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return sorted(iterator, key=self.key_udf) def __str__(self): @@ -205,7 +199,6 @@ def __init__(self, input_type: GenericTco = None): self.json_name = "distinct" def get_udf(self, iterator): - iterator = self.serialize_iterator(iterator) return iter(set(iterator)) def __str__(self): From 84d8bbd0c9bc964208e9ed5097247c1357556007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:02:59 +0200 Subject: [PATCH 11/16] revert properties --- .../main/resources/wayang-api-python-defaults.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties index f19e71055..a24bf4ad1 100644 --- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties +++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties @@ -14,6 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -wayang.api.python.worker = /Users/AURELIEN/Documents/University/Master/Master 2/ITU/Advanced Data Systems/Assignments/Assignment 3/incubator-wayang/python/src/pywy/execution/worker.py -wayang.api.python.path = /usr/local/Caskroom/miniconda/base/envs/wayang/bin/python3 -wayang.api.python.env.path = /usr/local/Caskroom/miniconda/base/envs/wayang/lib/python3.8/site-packages +wayang.api.python.worker = /var/www/html/python/src/pywy/execution/worker.py +wayang.api.python.path = python3 +wayang.api.python.env.path = /usr/local/lib/python3.8/dist-packages From 976c709f2767d016d9ead6a0922e82e363fc6c7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:11:55 +0200 Subject: [PATCH 12/16] update parquet input from drawflow --- .../operatorfromdrawflow/OperatorFromDrawflowConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala index 754d161d0..b159a7bb7 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala @@ -81,7 +81,7 @@ object OperatorFromDrawflowConverter { // input case "iBinary" => InputCollectionFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.InputCollection, InputCollectionFromJson.Data(operatorFromDrawflow.data("collectionGeneratorFunction").asInstanceOf[String]), executionPlatform) case "iTextFile" => TextFileInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileInput, TextFileInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String]), executionPlatform) - case "iParquet" => ParquetInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ParquetInput, ParquetInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String], operatorFromDrawflow.data("projection").asInstanceOf[Array[String]], operatorFromDrawflow.data("columnNames").asInstanceOf[List[String]]), executionPlatform) + case "iParquet" => ParquetInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ParquetInput, ParquetInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String], operatorFromDrawflow.data("projection").asInstanceOf[Array[String]]), executionPlatform) case "iCsvFile" => TableInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Table, TableInputFromJson.Data(operatorFromDrawflow.data("tableName").asInstanceOf[String], operatorFromDrawflow.data("columnNames").asInstanceOf[String].split(",").map(s => s.trim()).toList), executionPlatform) case "iJdbc" => JDBCRemoteInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.JDBCRemoteInput, JDBCRemoteInputFromJson.Data( From 54706911bc743050486dc27a5e61bab25348c581 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:13:14 +0200 Subject: [PATCH 13/16] remove distinct operator for commit --- python/src/pywy/operators/unary.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index 97e568a09..f442e2cab 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -189,21 +189,3 @@ def __str__(self): def __repr__(self): return super().__repr__() - - -class DistinctOperator(UnaryToUnaryOperator): - json_name: str - - def __init__(self, input_type: GenericTco = None): - super().__init__("Distinct", input_type, input_type) - self.json_name = "distinct" - - def get_udf(self, iterator): - return iter(set(iterator)) - - def __str__(self): - return super().__str__() - - def __repr__(self): - return super().__repr__() - From 905087f23f9cd80304e000aba109b24a262a77d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:14:30 +0200 Subject: [PATCH 14/16] remove type deserialization for commit --- python/src/pywy/types.py | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/python/src/pywy/types.py b/python/src/pywy/types.py index 2c1d6721d..e3802a0c7 100644 --- a/python/src/pywy/types.py +++ b/python/src/pywy/types.py @@ -242,39 +242,3 @@ def get_type_str(py_type: Any) -> str: return origin_str return str(py_type) - - -def serialize_iterator_to_operator_type( - operator_type: ConstrainedOperatorType, iterator: Iterable[Union[str, ConstrainedOperatorType]] -) -> Iterable[ConstrainedOperatorType]: - def _cast_value_to_type(value: Any, type: ConstrainedOperatorType): - if type is ndarray: - return ndarray(value) - elif type is IterableT: - return iter(value) - elif type is Record: - return Record(list(value)) - else: - return value - - if operator_type is str: - return iterator - - typed_iterator = list() - for x in iterator: - if not isinstance(x, str): - typed_iterator.append(x) - continue - - x = literal_eval(x) - - if isinstance(operator_type, Tuple): - args = get_args(operator_type) - for i, arg_type in enumerate(args): - x[i] = _cast_value_to_type(value=x[i], type=arg_type) - else: - x = _cast_value_to_type(x, operator_type) - - typed_iterator.append(x) - - return iter(typed_iterator) From 84684822887ea8734070f426f3492d2e4c13f59c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:16:45 +0200 Subject: [PATCH 15/16] remove column names --- .../src/main/scala/builder/JsonPlanBuilder.scala | 4 ++-- .../src/main/scala/org/apache/wayang/api/PlanBuilder.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala index d54c541df..1ab38cd2c 100644 --- a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala +++ b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala @@ -207,9 +207,9 @@ class JsonPlanBuilder() { private def visit(operator: ParquetInputFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) - planBuilder.readParquet(new ParquetSource(operator.data.filename, operator.data.projection, operator.data.columnNames: _*)).asInstanceOf[DataQuanta[Any]] + planBuilder.readParquet(operator.data.filename, operator.data.projection).asInstanceOf[DataQuanta[Any]] else - planBuilder.readParquet(new ParquetSource(operator.data.filename, operator.data.projection, operator.data.columnNames: _*)).withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)).asInstanceOf[DataQuanta[Any]] + planBuilder.readParquet(operator.data.filename, operator.data.projection).withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)).asInstanceOf[DataQuanta[Any]] } private def visit(operator: InputCollectionFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index 7a7c6e324..ca984282a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -129,11 +129,11 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job /** * Read a parquet file and provide it as a dataset of [[Record]]s. * - * @param url the URL of the Parquet file + * @param url the URL of the Parquet file * @param projection the projection, if any * @return [[DataQuanta]] of [[Record]]s representing the file */ - def readParquet(url: String, projection: Array[String] = null): RecordDataQuanta = load(ParquetSource.create(url, projection)) + def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection)) /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. From 00e9ad87752ed7cff0437091414796cafe5312ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 4 May 2025 20:17:59 +0200 Subject: [PATCH 16/16] update comment --- .../src/main/scala/org/apache/wayang/api/PlanBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index ca984282a..b13eb793c 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -129,7 +129,7 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job /** * Read a parquet file and provide it as a dataset of [[Record]]s. * - * @param url the URL of the Parquet file + * @param url the URL of the Parquet file * @param projection the projection, if any * @return [[DataQuanta]] of [[Record]]s representing the file */