diff --git a/python/.gitignore b/python/.gitignore index cd85f7a5f..d8d1f9b60 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,3 +1,3 @@ # Build files dist - +__pycache__ diff --git a/python/src/pywy/basic/data/__init__.py b/python/src/pywy/basic/data/__init__.py new file mode 100644 index 000000000..d9e26de2a --- /dev/null +++ b/python/src/pywy/basic/data/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/src/pywy/basic/data/record.py b/python/src/pywy/basic/data/record.py new file mode 100644 index 000000000..3579a140a --- /dev/null +++ b/python/src/pywy/basic/data/record.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any, List + + +class Record: + """ + A Type that represents a record with a schema. + """ + values: List[Any] + + def __init__(self, values: List[Any]) -> None: + self.values = values + + def copy(self) -> 'Record': + return Record(self.values.copy()) + + def equals(self, o: Any) -> bool: + if o is None or type(self) != type(o): + return False + + return self.values == o.values + + def hash_code(self) -> int: + return hash(self.values) + + def get_field(self, index: int) -> Any: + return self.values[index] + + def get_double(self, index: int) -> float: + return float(self.values[index]) + + def get_int(self, index: int) -> int: + return int(self.values[index]) + + def get_string(self, index: int) -> str: + return str(self.values[index]) + + def set_field(self, index: int, field: Any) -> None: + self.values[index] = field + + def add_field(self, field: Any) -> None: + self.values.append(field) + + def size(self) -> int: + return len(self.values) + + def __str__(self): + return "Record" + str(self.values) + + def __repr__(self): + return self.__str__() diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py index 17e197d5f..7e346baf5 100644 --- a/python/src/pywy/core/core.py +++ b/python/src/pywy/core/core.py @@ -1,4 +1,3 @@ - # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -17,18 +16,13 @@ from typing import Set, Iterable, Dict import json -import base64 -import cloudpickle import requests -import subprocess -import time -import os from pywy.core.platform import Platform from pywy.core.serializer import JSONSerializer from pywy.graph.graph import WayangGraph -from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec -from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator +from pywy.graph.types import WGraphOfVec +from pywy.operators import SinkOperator, UnaryToUnaryOperator class Plugin: @@ -122,7 +116,6 @@ def execute(self): for node in nodes: operator = node.current[0] - if isinstance(operator, UnaryToUnaryOperator): pipeline.append(operator) else: diff --git a/python/src/pywy/core/serializer.py b/python/src/pywy/core/serializer.py index 5846a0247..7e727c4bf 100644 --- a/python/src/pywy/core/serializer.py +++ b/python/src/pywy/core/serializer.py @@ -30,6 +30,7 @@ from pywy.types import get_java_type, NDimArray, ndim_from_type from pywy.operators import SinkOperator, UnaryToUnaryOperator, SourceUnaryOperator + class JSONSerializer: id_table: Iterable[int] @@ -53,39 +54,42 @@ def serialize(self, operator): json_operator["data"] = {} - if operator.json_name != "join": - if hasattr(operator, "input_type"): - if operator.input_type is not None: - json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json() - if hasattr(operator, "output_type"): - if operator.output_type is not None: - json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json() + if hasattr(operator, "input_type") and operator.input_type is not None: + json_operator["data"]["inputType"] = ndim_from_type(operator.input_type).to_json() + + if hasattr(operator, "output_type") and operator.output_type is not None: + json_operator["data"]["outputType"] = ndim_from_type(operator.output_type).to_json() if operator.json_name == "filter": json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.use_predicate)).decode('utf-8') - return json_operator - if operator.json_name == "reduceBy": + elif operator.json_name == "reduceBy": json_operator["data"]["keyUdf"] = base64.b64encode(cloudpickle.dumps(operator.key_function)).decode('utf-8') json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.reduce_function)).decode('utf-8') - return json_operator elif operator.json_name == "join": json_operator["data"]["thisKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.this_key_function)).decode('utf-8') json_operator["data"]["thatKeyUdf"] = base64.b64encode(cloudpickle.dumps(operator.that_key_function)).decode('utf-8') - return json_operator + elif operator.json_name == "cartesian": + del json_operator["data"] + elif operator.json_name == "dlTraining": json_operator["data"]["model"] = {"modelType": "DLModel", "op": operator.model.get_out().to_dict()} json_operator["data"]["option"] = operator.option.to_dict() - return json_operator else: if hasattr(operator, "get_udf"): json_operator["data"]["udf"] = base64.b64encode(cloudpickle.dumps(operator.get_udf)).decode('utf-8') if hasattr(operator, "path"): - json_operator["data"]["filename"] = operator.path + json_operator["data"]["filename"] = operator.path + + if hasattr(operator, "projection"): + json_operator["data"]["projection"] = operator.projection + + if hasattr(operator, "column_names"): + json_operator["data"]["column_names"] = operator.column_names return json_operator @@ -98,7 +102,7 @@ def serialize_pipeline(self, pipeline): json_operator["cat"] = "unary" json_operator["data"] = {} json_operator["input"] = list(map(lambda x: self.id_table[x], pipeline[0].inputOperator)) - json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[len(pipeline) - 1].outputOperator)) + json_operator["output"] = list(map(lambda x: self.id_table[x], pipeline[-1].outputOperator)) if len(pipeline) == 1: return self.serialize(pipeline[0]) diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py index 59b8b5bb3..a208cd2f8 100644 --- a/python/src/pywy/dataquanta.py +++ b/python/src/pywy/dataquanta.py @@ -15,13 +15,13 @@ # limitations under the License. # -from typing import Dict, Set, List, cast +from typing import Dict, Set, List, Optional, cast from pywy.core.core import Plugin, PywyPlan from pywy.operators.base import PO_T from pywy.types import (GenericTco, Predicate, Function, BiFunction, FlatmapFunction, IterableOut, T, In, Out) from pywy.operators import * -from pywy.basic.model.ops import Op +from pywy.basic.data.record import Record from pywy.basic.model.option import Option from pywy.basic.model.models import Model @@ -50,7 +50,6 @@ def __init__(self, configuration: Configuration = Configuration()): """ add a :class:`Plugin` to the :class:`Context` """ - def register(self, *plugins: Plugin): for p in plugins: self.plugins.update(p) @@ -59,15 +58,19 @@ def register(self, *plugins: Plugin): """ remove a :class:`Plugin` from the :class:`Context` """ - def unregister(self, *plugins: Plugin): for p in plugins: self.plugins.remove(p) return self - def textfile(self, file_path: str) -> 'DataQuanta[str]': + def textfile(self, file_path: str) -> "DataQuanta[str]": return DataQuanta(self, TextFileSource(file_path)) + def parquet( + self, file_path: str, projection: Optional[List[str]] = None, column_names: Optional[List[str]] = None + ) -> "DataQuanta[Record]": + return DataQuanta(self, ParquetSource(file_path, projection, column_names)) + def __str__(self): return "Plugins: {}".format(str(self.plugins)) @@ -88,25 +91,31 @@ def __init__(self, context: WayangContext, operator: PywyOperator): def filter(self: "DataQuanta[T]", p: Predicate, input_type: GenericTco = None) -> "DataQuanta[T]": return DataQuanta(self.context, self._connect(FilterOperator(p, input_type))) - def map(self: "DataQuanta[In]", f: Function, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[Out]": + def map( + self: "DataQuanta[In]", + f: Function, + input_type: GenericTco = None, + output_type: GenericTco = None + ) -> "DataQuanta[Out]": return DataQuanta(self.context, self._connect(MapOperator(f, input_type, output_type))) - def flatmap(self: "DataQuanta[In]", f: FlatmapFunction, input_type: GenericTco = None, output_type: GenericTco = None) -> "DataQuanta[IterableOut]": + def flatmap( + self: "DataQuanta[In]", + f: FlatmapFunction, + input_type: GenericTco = None, + output_type: GenericTco = None + ) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(FlatmapOperator(f, input_type, output_type))) - def reduce_by_key(self: "DataQuanta[In]", - key_f: Function, - f: BiFunction, - input_type: GenericTco = None - ) -> "DataQuanta[IterableOut]": - + def reduce_by_key( + self: "DataQuanta[In]", + key_f: Function, + f: BiFunction, + input_type: GenericTco = None + ) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(ReduceByKeyOperator(key_f, f, input_type))) - def sort(self: "DataQuanta[In]", - key_f: Function, - input_type: GenericTco = None - ) -> "DataQuanta[IterableOut]": - + def sort(self: "DataQuanta[In]", key_f: Function, input_type: GenericTco = None) -> "DataQuanta[IterableOut]": return DataQuanta(self.context, self._connect(SortOperator(key_f, input_type))) def join( @@ -115,21 +124,34 @@ def join( that: "DataQuanta[In]", that_key_f: Function, input_type: GenericTco = None, - output_type: GenericTco = None - ) -> "DataQuanta[Out]": - + ) -> "DataQuanta[Out]": op = JoinOperator( this_key_f, that, that_key_f, - input_type, - output_type + input_type ) self._connect(op), return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) + ) + + def cartesian( + self: "DataQuanta[In]", + that: "DataQuanta[In]", + input_type: GenericTco = None, + ) -> "DataQuanta[Out]": + op = CartesianOperator( + that, + input_type + ) + + self._connect(op), + return DataQuanta( + self.context, + that._connect(op, 1) ) def dlTraining( @@ -140,7 +162,6 @@ def dlTraining( input_type: GenericTco, output_type: GenericTco ) -> "DataQuanta[Out]": - op = DLTrainingOperator( model, option, @@ -151,7 +172,7 @@ def dlTraining( return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) ) def predict( @@ -169,10 +190,10 @@ def predict( return DataQuanta( self.context, - that._connect(op,1) + that._connect(op, 1) ) - def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None): + def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = None) -> None: last: List[SinkOperator] = [ cast( SinkOperator, @@ -184,7 +205,6 @@ def store_textfile(self: "DataQuanta[In]", path: str, input_type: GenericTco = N ) ) ] - #print(PywyPlan(self.context.plugins, last)) PywyPlan(self.context.plugins, self.context.configuration.entries, last).execute() def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator: diff --git a/python/src/pywy/execution/util.py b/python/src/pywy/execution/util.py index 3fd3e2e1b..32c7a5b96 100644 --- a/python/src/pywy/execution/util.py +++ b/python/src/pywy/execution/util.py @@ -23,4 +23,3 @@ class SpecialLengths(object): END_OF_STREAM = -4 NULL = -5 START_ARROW_STREAM = -6 - diff --git a/python/src/pywy/execution/worker.py b/python/src/pywy/execution/worker.py index ed94d45a6..44b5941b8 100755 --- a/python/src/pywy/execution/worker.py +++ b/python/src/pywy/execution/worker.py @@ -15,20 +15,16 @@ # limitations under the License. # +import base64 import os +import pickle import socket import struct -import base64 -import re -import sys -import ast -import copy -import pickle -import cloudpickle -import numpy as np +import numpy as np from pywy.execution.util import SpecialLengths + def read_int(stream): length = stream.read(4) if not length: @@ -51,6 +47,7 @@ def loads(self, stream): raise EOFError elif length == SpecialLengths.NULL: return None + s = stream.read(length) return s.decode("utf-8") if self.use_unicode else s @@ -100,6 +97,7 @@ def dump_stream(iterator, stream): ## write_with_length(obj, stream) write_int(SpecialLengths.END_OF_DATA_SECTION, stream) + def process(infile, outfile): udf_length = read_int(infile) serialized_udf = infile.read(udf_length) @@ -109,6 +107,7 @@ def process(infile, outfile): out_iter = func(iterator) dump_stream(iterator=out_iter, stream=outfile) + def local_connect(port): sock = None errors = [] @@ -118,7 +117,7 @@ def local_connect(port): af, socktype, proto, _, sa = res try: sock = socket.socket(af, socktype, proto) - sock.settimeout(30) + sock.settimeout(60) sock.connect(sa) sockfile = sock.makefile("rwb", 65536) return (sockfile, sock) diff --git a/python/src/pywy/graph/graph.py b/python/src/pywy/graph/graph.py index bc9d6682e..2bcd87647 100644 --- a/python/src/pywy/graph/graph.py +++ b/python/src/pywy/graph/graph.py @@ -50,13 +50,15 @@ def wrap(op: T) -> Optional['GraphNode[K, T]'] | None: return map(wrap, adjacent) - def visit(self, - parent: 'GraphNode[K, T]', - udf: Callable[['GraphNode[K, T]', 'GraphNode[K, T]'], Any], - visit_status: bool = True): + def visit( + self, + parent: 'GraphNode[K, T]', + udf: Callable[['GraphNode[K, T]', 'GraphNode[K, T]'], Any], + visit_status: bool = True + ): if self.visited == visit_status: return - self.visited = ~ visit_status + self.visited = ~visit_status return udf(self, parent) @@ -77,11 +79,11 @@ def build_node(self, t: T) -> GraphNode[K, T]: pass def traversal( - self, - nodes: Iterable[GraphNode[K, T]], - udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any], - origin: Optional[GraphNode[K, T]] = None, - visit_status: bool = True + self, + nodes: Iterable[GraphNode[K, T]], + udf: Callable[[GraphNode[K, T], GraphNode[K, T]], Any], + origin: Optional[GraphNode[K, T]] = None, + visit_status: bool = True ): for node in nodes: adjacent = node.walk(self.created_nodes) diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py index 99ad6f0b9..fd4dc2c90 100644 --- a/python/src/pywy/operators/__init__.py +++ b/python/src/pywy/operators/__init__.py @@ -16,25 +16,30 @@ # from pywy.operators.base import PywyOperator +from pywy.operators.binary import BinaryToUnaryOperator, JoinOperator, DLTrainingOperator, PredictOperator, \ + CartesianOperator from pywy.operators.sink import TextFileSink, SinkOperator -from pywy.operators.source import TextFileSource, SourceUnaryOperator -from pywy.operators.unary import UnaryToUnaryOperator, FilterOperator, MapOperator, FlatmapOperator, ReduceByKeyOperator, SortOperator -from pywy.operators.binary import BinaryToUnaryOperator, JoinOperator, DLTrainingOperator, PredictOperator +from pywy.operators.source import TextFileSource, ParquetSource, SourceUnaryOperator +from pywy.operators.unary import UnaryToUnaryOperator, FilterOperator, MapOperator, FlatmapOperator, \ + ReduceByKeyOperator, SortOperator, DistinctOperator __ALL__ = [ - PywyOperator, - UnaryToUnaryOperator, - BinaryToUnaryOperator, - TextFileSink, - TextFileSource, - FilterOperator, - SinkOperator, - SortOperator, - SourceUnaryOperator, - MapOperator, - ReduceByKeyOperator, - FlatmapOperator, - JoinOperator, - DLTrainingOperator, - PredictOperator + PywyOperator, + UnaryToUnaryOperator, + BinaryToUnaryOperator, + TextFileSink, + TextFileSource, + ParquetSource, + FilterOperator, + SinkOperator, + SortOperator, + DistinctOperator, + SourceUnaryOperator, + MapOperator, + ReduceByKeyOperator, + FlatmapOperator, + JoinOperator, + CartesianOperator, + DLTrainingOperator, + PredictOperator, ] diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py index 3c863337c..ee57d421c 100644 --- a/python/src/pywy/operators/base.py +++ b/python/src/pywy/operators/base.py @@ -15,8 +15,8 @@ # limitations under the License. # -from typing import (TypeVar, Optional, List) -from pywy.types import (typecheck, ConstrainedOperatorType) +from typing import TypeVar, Optional, List +from pywy.types import typecheck, ConstrainedOperatorType class PywyOperator: @@ -29,15 +29,15 @@ class PywyOperator: output_type: ConstrainedOperatorType def __init__(self, - name: str, - cat: str, - input_type: ConstrainedOperatorType = None, - output_type: ConstrainedOperatorType = None, - input_length: Optional[int] = 1, - output_length: Optional[int] = 1, - *args, - **kwargs - ): + name: str, + cat: str, + input_type: ConstrainedOperatorType = None, + output_type: ConstrainedOperatorType = None, + input_length: Optional[int] = 1, + output_length: Optional[int] = 1, + *args, + **kwargs + ): typecheck(input_type) typecheck(output_type) self.name = (self.prefix() + name + self.postfix()).strip() @@ -92,4 +92,5 @@ def __str__(self): def __repr__(self): return self.__str__() + PO_T = TypeVar('PO_T', bound=PywyOperator) diff --git a/python/src/pywy/operators/binary.py b/python/src/pywy/operators/binary.py index 1803bb92a..a1a09cbb2 100644 --- a/python/src/pywy/operators/binary.py +++ b/python/src/pywy/operators/binary.py @@ -15,24 +15,16 @@ # limitations under the License. # -from itertools import chain, groupby -from collections import defaultdict import ast -from pywy.operators.base import PywyOperator from pywy.basic.model.models import Model from pywy.basic.model.option import Option +from pywy.operators.base import PywyOperator from pywy.types import ( - GenericTco, - GenericUco, - Predicate, - get_type_predicate, - Function, - BiFunction, - get_type_function, - FlatmapFunction, - get_type_flatmap_function - ) + GenericTco, + Function +) + class BinaryToUnaryOperator(PywyOperator): @@ -48,6 +40,7 @@ def __str__(self): def __repr__(self): return super().__repr__() + class JoinOperator(BinaryToUnaryOperator): this_key_function: Function that: PywyOperator @@ -55,20 +48,32 @@ class JoinOperator(BinaryToUnaryOperator): json_name: str def __init__( - self, - this_key_function: Function, - that: PywyOperator, - that_key_function: Function, - input_type: GenericTco, - output_type: GenericTco - ): - super().__init__("Join", input_type, output_type) + self, + this_key_function: Function, + that: PywyOperator, + that_key_function: Function, + input_type: GenericTco, + ): + super().__init__("Join", input_type, (input_type, input_type)) self.this_key_function = lambda g: this_key_function(ast.literal_eval(next(g))) self.that = that self.that_key_function = lambda g: that_key_function(ast.literal_eval(next(g))) self.json_name = "join" +class CartesianOperator(BinaryToUnaryOperator): + that: PywyOperator + json_name: str + + def __init__( + self, + that: PywyOperator, + input_type: GenericTco, + ): + super().__init__("Cartesian", input_type, input_type) + self.that = that + self.json_name = "cartesian" + class DLTrainingOperator(BinaryToUnaryOperator): model: Model diff --git a/python/src/pywy/operators/sink.py b/python/src/pywy/operators/sink.py index b3fdbaba6..371a5ed8f 100644 --- a/python/src/pywy/operators/sink.py +++ b/python/src/pywy/operators/sink.py @@ -23,7 +23,7 @@ class SinkOperator(PywyOperator): def postfix(self) -> str: - return 'Sink' + return "Sink" class SinkUnaryOperator(SinkOperator): @@ -43,7 +43,7 @@ class TextFileSink(SinkUnaryOperator): json_name: str def __init__(self, path: str, input_type: GenericTco): - super().__init__('TextFile', input_type) + super().__init__("TextFile", input_type) self.path = path self.json_name = "textFileOutput" diff --git a/python/src/pywy/operators/source.py b/python/src/pywy/operators/source.py index 479e5c5ab..d55caba44 100644 --- a/python/src/pywy/operators/source.py +++ b/python/src/pywy/operators/source.py @@ -15,16 +15,20 @@ # limitations under the License. # +from typing import List, Optional + from pywy.operators.base import PywyOperator +from pywy.basic.data.record import Record +from pywy.types import GenericTco class SourceUnaryOperator(PywyOperator): - def __init__(self, name: str): + def __init__(self, name: str, output_type: GenericTco): super(SourceUnaryOperator, self).__init__( name=name, input_type=None, - output_type=str, + output_type=output_type, cat="input", input_length=0, output_length=1 @@ -45,7 +49,7 @@ class TextFileSource(SourceUnaryOperator): json_name: str def __init__(self, path: str): - super(TextFileSource, self).__init__('TextFile') + super(TextFileSource, self).__init__('TextFile', output_type=str) self.path = path self.json_name = "textFileInput" @@ -55,3 +59,20 @@ def __str__(self): def __repr__(self): return super().__repr__() + +class ParquetSource(SourceUnaryOperator): + path: str + projection: Optional[List[str]] + json_name: str + + def __init__(self, path: str, projection: Optional[List[str]] = None): + super(ParquetSource, self).__init__('Parquet', output_type=Record) + self.path = path + self.projection = projection + self.json_name = "parquetInput" + + def __str__(self): + return super().__str__() + + def __repr__(self): + return super().__repr__() diff --git a/python/src/pywy/operators/unary.py b/python/src/pywy/operators/unary.py index fd736df6b..f442e2cab 100644 --- a/python/src/pywy/operators/unary.py +++ b/python/src/pywy/operators/unary.py @@ -16,22 +16,19 @@ # from itertools import chain, groupby -from collections import defaultdict -import ast from pywy.operators.base import PywyOperator from pywy.types import ( - GenericTco, - GenericUco, - Predicate, - get_type_predicate, - Function, - BiFunction, - get_type_function, - get_type_bifunction, - FlatmapFunction, - get_type_flatmap_function - ) + GenericTco, + Predicate, + get_type_predicate, + Function, + BiFunction, + get_type_function, + get_type_bifunction, + FlatmapFunction, + get_type_flatmap_function +) class UnaryToUnaryOperator(PywyOperator): @@ -40,7 +37,7 @@ def __init__(self, name: str, input_type: GenericTco = None, output_type: Generi super().__init__(name, "unary", input_type, output_type, 1, 1) def postfix(self) -> str: - return 'OperatorUnary' + return "OperatorUnary" def __str__(self): return super().__str__() @@ -50,7 +47,6 @@ def __repr__(self): class FilterOperator(UnaryToUnaryOperator): - predicate: Predicate json_name: str @@ -75,7 +71,6 @@ def __repr__(self): class MapOperator(UnaryToUnaryOperator): - function: Function json_name: str @@ -95,8 +90,8 @@ def __str__(self): def __repr__(self): return super().__repr__() -class MapPartitionsOperator(UnaryToUnaryOperator): +class MapPartitionsOperator(UnaryToUnaryOperator): function: Function json_name: str @@ -118,11 +113,9 @@ def __repr__(self): class FlatmapOperator(UnaryToUnaryOperator): - fm_function: FlatmapFunction json_name: str - def __init__(self, fm_function: FlatmapFunction, input_type: GenericTco = None, output_type: GenericTco = None): if input_type is None or output_type is None: input_type, output_type = get_type_flatmap_function(fm_function) if fm_function else (None, None) @@ -139,17 +132,18 @@ def __str__(self): def __repr__(self): return super().__repr__() + class ReduceByKeyOperator(UnaryToUnaryOperator): key_function: Function reduce_function: BiFunction json_name: str def __init__( - self, - key_function: Function, - reduce_function: BiFunction, - input_type: GenericTco = None, - ): + self, + key_function: Function, + reduce_function: BiFunction, + input_type: GenericTco = None, + ): if input_type is None: input_type = get_type_bifunction(reduce_function) if reduce_function else (None, None, None) super().__init__("ReduceByKey", (input_type[0], input_type[1])) @@ -158,14 +152,8 @@ def __init__( self.json_name = "reduceBy" def get_udf(self, iterator): - # Use ast.literal_eval() to safely evaluate the string as a Python literal - #print(", ".join(iterator)) - #list_of_tuples = ast.literal_eval("[" + ", ".join(iterator) + "]") - - tuples = [(str(item[0]), str(item[1])) for item in iterator] - grouped_data = groupby(sorted(tuples, key=self.key_function), key=self.key_function) + grouped_data = groupby(sorted(iterator, key=self.key_function), key=self.key_function) - # Create a defaultdict to store the sums sums = {} for key, group in grouped_data: @@ -183,14 +171,13 @@ def __repr__(self): class SortOperator(UnaryToUnaryOperator): - key_udf: Function json_name: str def __init__(self, function: Function, input_type: GenericTco = None): if input_type is None: input_type, output_type = get_type_function(function) if function else (None, None) - super().__init__("Sort", input_type, None) + super().__init__("Sort", input_type, input_type) self.key_udf = function self.json_name = "sort" @@ -202,4 +189,3 @@ def __str__(self): def __repr__(self): return super().__repr__() - diff --git a/python/src/pywy/tests/filter_test.py b/python/src/pywy/tests/filter_test.py index aa04f0796..82e8812db 100644 --- a/python/src/pywy/tests/filter_test.py +++ b/python/src/pywy/tests/filter_test.py @@ -16,9 +16,7 @@ # import unittest -#from typing import Tuple, Callable, Iterable from pywy.dataquanta import WayangContext -from unittest.mock import Mock from pywy.platforms.java import JavaPlugin from pywy.platforms.spark import SparkPlugin @@ -33,7 +31,7 @@ def test_to_json(self): right = ctx.textfile("file:///var/www/html/README.md") \ .filter(lambda w: "Wayang" in w, str) \ .map(lambda w: (len(w), w), str, (int, str)) - join = left.join(lambda w: w[0], right, lambda w: w[0], (int, str), ((int, str), (int, str))) \ + join = left.join(lambda w: w[0], right, lambda w: w[0], (int, str)) \ .store_textfile("file:///var/www/html/data/wordcount-out-python.txt") self.assertEqual(True, True) diff --git a/python/src/pywy/tests/test.py b/python/src/pywy/tests/test.py index 2cc0af22e..8fbbcfed2 100644 --- a/python/src/pywy/tests/test.py +++ b/python/src/pywy/tests/test.py @@ -16,9 +16,8 @@ # import unittest -from typing import Tuple, Callable, Iterable, List +from typing import List from pywy.dataquanta import WayangContext -from unittest.mock import Mock from pywy.platforms.java import JavaPlugin from pywy.platforms.spark import SparkPlugin from pywy.platforms.tensorflow import TensorflowPlugin diff --git a/python/src/pywy/types.py b/python/src/pywy/types.py index 47ccc3763..e3802a0c7 100644 --- a/python/src/pywy/types.py +++ b/python/src/pywy/types.py @@ -15,11 +15,14 @@ # limitations under the License. # -from typing import (Generic, TypeVar, Callable, Hashable, Iterable, Type, Union, Tuple, get_args, get_origin, List, Dict, Any) +import re +from ast import literal_eval from inspect import signature +from typing import ( + Generic, TypeVar, Callable, Hashable, Iterable, Type, Union, Tuple, get_args, get_origin, List, Dict, Any +) from numpy import int32, int64, float32, float64, ndarray -import re - +from pywy.basic.data.record import Record from pywy.exception import PywyException T = TypeVar("T") # Type @@ -45,7 +48,7 @@ "NumberOrArray", float, int, complex, int32, int64, float32, float64, ndarray ) -ConstrainedOperatorType = Union[PrimitiveType, NumberOrArray, IterableT, ListT] +ConstrainedOperatorType = Union[PrimitiveType, NumberOrArray, IterableT, ListT, Record] Predicate = Callable[[ConstrainedOperatorType], bool] Function = Callable[[ConstrainedOperatorType], ConstrainedOperatorType] @@ -70,6 +73,8 @@ 1 {origin: int, depth: 0} """ + + class NDimArray: origin: Type depth: int @@ -84,6 +89,7 @@ def __str__(self) -> str: def to_json(self) -> dict: return {"origin": get_java_type(self.origin), "depth": self.depth} + def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArray: # Handle basic types and direct typing module classes if hasattr(py_type, '__name__'): @@ -102,7 +108,8 @@ def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArra return NDimArray(py_type, depth) -#Define the mappings + +# Define the mappings type_mappings: Dict[Type, str] = { 'int': 'Integer', 'float': 'Float', @@ -114,9 +121,11 @@ def ndim_from_type(py_type: ConstrainedOperatorType, depth: int = 0) -> NDimArra 'Dict': 'Map', 'tuple': 'Tuple', 'Tuple': 'Tuple', - 'Any': 'Object' + 'Any': 'Object', + 'Record': 'Record', } + def get_type_predicate(call: Predicate) -> type: sig = signature(call) if len(sig.parameters) != 1: @@ -176,19 +185,15 @@ def get_type_flatmap_function(call: FlatmapFunction) -> (type, type): keys = list(sig.parameters.keys()) return sig.parameters[keys[0]].annotation, sig.return_annotation.__args__[0] + def typecheck(input_type: Type[ConstrainedOperatorType]): allowed_types = get_args(ConstrainedOperatorType) - print(allowed_types) - print(input_type) if input_type in allowed_types or input_type is None: return origin = get_origin(input_type) args = get_args(input_type) - print(origin) - print(args) - if isinstance(input_type, List) and args: typecheck(args[0]) elif isinstance(input_type, Tuple): @@ -197,10 +202,7 @@ def typecheck(input_type: Type[ConstrainedOperatorType]): else: raise TypeError(f"Unsupported Operator type: {input_type}") else: - #print(get_args(ConstrainedOperatorType)) - #if candT not in get_args(ConstrainedOperatorType) and candT is not None: - #if candT is not PrimitiveType and candT is not IterableT and candT is not NumberOrArray and candT is not None: - raise TypeError(f"Unsupported Operator type: {input_type}, {args}, {isinstance(input_type, Tuple)}") + raise TypeError(f"Unsupported Operator type: {input_type}, {origin}, {args}") def get_java_type(input_type: ConstrainedOperatorType) -> str: str_type = get_type_str(input_type) @@ -208,6 +210,7 @@ def get_java_type(input_type: ConstrainedOperatorType) -> str: py_type = str_type.replace("typing.", "") return convert_type(py_type) + def convert_type(py_type: str) -> str: # Regex to find generic types like List[float], Dict[str, int], etc. generic_type_pattern = re.compile(r'(\w+)\[(.+)\]') @@ -221,6 +224,7 @@ def convert_type(py_type: str) -> str: else: return type_mappings.get(py_type, py_type) + def get_type_str(py_type: Any) -> str: # Handle basic types and direct typing module classes if hasattr(py_type, '__name__'): diff --git a/wayang-api/wayang-api-json/src/main/scala/Main.scala b/wayang-api/wayang-api-json/src/main/scala/Main.scala index 33a67a2cf..adbf2c02a 100644 --- a/wayang-api/wayang-api-json/src/main/scala/Main.scala +++ b/wayang-api/wayang-api-json/src/main/scala/Main.scala @@ -18,18 +18,14 @@ package org.apache.wayang.api.json import zio._ -import zio.IO import zio.http._ -import zio.Console._ import scala.util.Try import org.apache.wayang.api.json.builder.JsonPlanBuilder import org.apache.wayang.api.json.operatorfromdrawflow.OperatorFromDrawflowConverter import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson import org.apache.wayang.api.json.parserutil.ParseOperatorsFromDrawflow -import org.apache.wayang.api.json.parserutil.ParseOperatorsFromJson import org.apache.wayang.api.json.parserutil.ParsePlanFromJson -import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson object Main extends ZIOAppDefault { val drawRoute = diff --git a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala index f4e7a2b2f..1ab38cd2c 100644 --- a/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala +++ b/wayang-api/wayang-api-json/src/main/scala/builder/JsonPlanBuilder.scala @@ -22,10 +22,10 @@ import org.apache.wayang.api.json.parserutil.{SerializableIterable, Serializable import org.apache.wayang.api.json.operatorfromjson.{ComposedOperatorFromJson, OperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, DLTrainingOperatorFromJson, UnionOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TableInputFromJson, TextFileInputFromJson} +import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TableInputFromJson, TextFileInputFromJson, ParquetInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, ReduceOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOperatorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, ReduceOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.PlanFromJson import org.apache.wayang.api._ import org.apache.wayang.basic.operators._ @@ -42,15 +42,9 @@ import org.apache.wayang.flink.Flink import org.apache.wayang.tensorflow.Tensorflow import org.apache.wayang.genericjdbc.GenericJdbc import org.apache.wayang.sqlite3.Sqlite3 -import org.apache.wayang.postgres.Postgres import org.apache.wayang.core.plugin.Plugin import org.apache.wayang.basic.model.DLModel; -import org.apache.wayang.basic.model.optimizer.Adam; -import org.apache.wayang.basic.model.op.nn._; -import org.apache.wayang.basic.model.op._; -import org.apache.wayang.basic.model.optimizer._; -import org.apache.wayang.api.json.operatorfromjson.binary.{Op => JsonOp} -import org.apache.wayang.api.util.NDimArray +import org.apache.wayang.basic.data.Record; import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ @@ -160,6 +154,7 @@ class JsonPlanBuilder() { operator match { // input case inputOperator: TextFileInputFromJson => visit(inputOperator, planBuilder) + case inputOperator: ParquetInputFromJson => visit(inputOperator, planBuilder) case inputOperator: InputCollectionFromJson => visit(inputOperator, planBuilder) case inputOperator: TableInputFromJson => visit(inputOperator, planBuilder) case inputOperator: JDBCRemoteInputFromJson => visit(inputOperator, planBuilder) @@ -173,7 +168,7 @@ class JsonPlanBuilder() { case operator: FlatMapOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: ReduceByOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: CountOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) - case operator: GroupByOpeartorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) + case operator: GroupByOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: SortOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: DistinctOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) case operator: ReduceOperatorFromJson => this.visit(operator, executeRecursive(this.operators(operator.input(0)), planBuilder)) @@ -210,6 +205,13 @@ class JsonPlanBuilder() { planBuilder.readTextFile(operator.data.filename).asInstanceOf[DataQuanta[Any]].withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } + private def visit(operator: ParquetInputFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { + if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) + planBuilder.readParquet(operator.data.filename, operator.data.projection).asInstanceOf[DataQuanta[Any]] + else + planBuilder.readParquet(operator.data.filename, operator.data.projection).withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)).asInstanceOf[DataQuanta[Any]] + } + private def visit(operator: InputCollectionFromJson, planBuilder: PlanBuilder): DataQuanta[Any] = { val iterable = SerializableIterable.create(operator.data.udf) if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) @@ -329,7 +331,7 @@ class JsonPlanBuilder() { dataQuanta.count.asInstanceOf[DataQuanta[Any]].withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } - private def visit(operator: GroupByOpeartorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { + private def visit(operator: GroupByOperatorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { val lambda = SerializableLambda.createLambda[Any, Any](operator.data.keyUdf) if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) dataQuanta.groupByKey(lambda).map(arrayList => arrayList.asScala.toList) @@ -347,9 +349,9 @@ class JsonPlanBuilder() { } private def visit(operator: DistinctOperatorFromJson, dataQuanta: DataQuanta[Any]): DataQuanta[Any] = { - if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) + if (!ExecutionPlatforms.All.contains(operator.executionPlatform)) { dataQuanta.distinct - else + } else dataQuanta.distinct.withTargetPlatforms(getExecutionPlatform(operator.executionPlatform)) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala index f748af627..b159a7bb7 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromdrawflow/OperatorFromDrawflowConverter.scala @@ -20,10 +20,10 @@ package org.apache.wayang.api.json.operatorfromdrawflow import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, UnionOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.binary._ import org.apache.wayang.api.json.operatorfromjson.input._ import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output._ +import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson import org.apache.wayang.api.json.operatorfromjson.unary._ import org.apache.wayang.api.json.parserutil.ParseOperatorsFromDrawflow @@ -81,6 +81,7 @@ object OperatorFromDrawflowConverter { // input case "iBinary" => InputCollectionFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.InputCollection, InputCollectionFromJson.Data(operatorFromDrawflow.data("collectionGeneratorFunction").asInstanceOf[String]), executionPlatform) case "iTextFile" => TextFileInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileInput, TextFileInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String]), executionPlatform) + case "iParquet" => ParquetInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ParquetInput, ParquetInputFromJson.Data(operatorFromDrawflow.data("inputFileURL").asInstanceOf[String], operatorFromDrawflow.data("projection").asInstanceOf[Array[String]]), executionPlatform) case "iCsvFile" => TableInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Table, TableInputFromJson.Data(operatorFromDrawflow.data("tableName").asInstanceOf[String], operatorFromDrawflow.data("columnNames").asInstanceOf[String].split(",").map(s => s.trim()).toList), executionPlatform) case "iJdbc" => JDBCRemoteInputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.JDBCRemoteInput, JDBCRemoteInputFromJson.Data( @@ -92,11 +93,11 @@ object OperatorFromDrawflowConverter { )) // unary - case "filter" => FilterOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Filter, FilterOperatorFromJson.Data(operatorFromDrawflow.data("booleanFunction").asInstanceOf[String]), executionPlatform) + case "filter" => FilterOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Filter, FilterOperatorFromJson.Data(operatorFromDrawflow.data("booleanFunction").asInstanceOf[String], None, None), executionPlatform) case "reduceBy" => ReduceByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.ReduceBy, ReduceByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], operatorFromDrawflow.data("reduceFunction").asInstanceOf[String]), executionPlatform) case "count" => CountOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Count, executionPlatform) - case "groupBy" => GroupByOpeartorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.GroupBy, GroupByOpeartorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) - case "sort" => SortOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sort, SortOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) + case "groupBy" => GroupByOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.GroupBy, GroupByOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String]), executionPlatform) + case "sort" => SortOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Sort, SortOperatorFromJson.Data(operatorFromDrawflow.data("keyFunction").asInstanceOf[String], None, None), executionPlatform) case "flatMap" => FlatMapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.FlatMap, FlatMapOperatorFromJson.Data(operatorFromDrawflow.data("flatMapFunction").asInstanceOf[String], None, None), executionPlatform) case "map" => MapOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Map, MapOperatorFromJson.Data(operatorFromDrawflow.data("mapFunction").asInstanceOf[String], None, None), executionPlatform) case "reduce" => ReduceOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Reduce, ReduceOperatorFromJson.Data(operatorFromDrawflow.data("reduceFunction").asInstanceOf[String], None, None), executionPlatform) @@ -108,7 +109,7 @@ object OperatorFromDrawflowConverter { case "union" => UnionOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Union, executionPlatform) case "coGroup" => CoGroupOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.CoGroup, CoGroupOperatorFromJson.Data(operatorFromDrawflow.data("groupKey1").asInstanceOf[String], operatorFromDrawflow.data("groupKey2").asInstanceOf[String]), executionPlatform) case "cartesian" => CartesianOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Cartesian, executionPlatform) - case "join" => JoinOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Join, JoinOperatorFromJson.Data(operatorFromDrawflow.data("joinKey1").asInstanceOf[String], operatorFromDrawflow.data("joinKey2").asInstanceOf[String]), executionPlatform) + case "join" => JoinOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Join, JoinOperatorFromJson.Data(operatorFromDrawflow.data("joinKey1").asInstanceOf[String], operatorFromDrawflow.data("joinKey2").asInstanceOf[String], None, None), executionPlatform) case "intersect" => IntersectOperatorFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.Intersect, executionPlatform) // loop @@ -119,7 +120,5 @@ object OperatorFromDrawflowConverter { // output case "oTextFile" => TextFileOutputFromJson(id, input, output, cat, OperatorFromJson.OperatorNames.TextFileOutput, TextFileOutputFromJson.Data(operatorFromDrawflow.data("outputFileURL").asInstanceOf[String]), executionPlatform) } - } - } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala index f4cacfc31..334b80717 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/ContextFromJson.scala @@ -29,7 +29,7 @@ import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOperatorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} class ContextFromJson(val platforms: List[String], val origin: String, diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala index 6514610b3..58d41b5c1 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/OperatorFromJson.scala @@ -20,23 +20,23 @@ package org.apache.wayang.api.json.operatorfromjson import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson.OperatorNames -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, PredictOperatorFromJson, DLTrainingOperatorFromJson,UnionOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} -import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} +import org.apache.wayang.api.json.operatorfromjson.binary._ +import org.apache.wayang.api.json.operatorfromjson.input._ +import org.apache.wayang.api.json.operatorfromjson.loop._ +import org.apache.wayang.api.json.operatorfromjson.other._ +import org.apache.wayang.api.json.operatorfromjson.output._ +import org.apache.wayang.api.json.operatorfromjson.unary._ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "operatorName", visible = true) @JsonSubTypes( Array( - // Input operators new JsonSubTypes.Type(value = classOf[TextFileInputFromJson], name = OperatorNames.TextFileInput), + new JsonSubTypes.Type(value = classOf[ParquetInputFromJson], name = OperatorNames.ParquetInput), new JsonSubTypes.Type(value = classOf[InputCollectionFromJson], name = OperatorNames.InputCollection), new JsonSubTypes.Type(value = classOf[InputCollectionFromJson], name = OperatorNames.Table), new JsonSubTypes.Type(value = classOf[JDBCRemoteInputFromJson], name = OperatorNames.JDBCRemoteInput), @@ -50,7 +50,7 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, new JsonSubTypes.Type(value = classOf[FilterOperatorFromJson], name = OperatorNames.Filter), new JsonSubTypes.Type(value = classOf[ReduceByOperatorFromJson], name = OperatorNames.ReduceBy), new JsonSubTypes.Type(value = classOf[CountOperatorFromJson], name = OperatorNames.Count), - new JsonSubTypes.Type(value = classOf[GroupByOpeartorFromJson], name = OperatorNames.GroupBy), + new JsonSubTypes.Type(value = classOf[GroupByOperatorFromJson], name = OperatorNames.GroupBy), new JsonSubTypes.Type(value = classOf[SortOperatorFromJson], name = OperatorNames.Sort), new JsonSubTypes.Type(value = classOf[DistinctOperatorFromJson], name = OperatorNames.Distinct), new JsonSubTypes.Type(value = classOf[ReduceByOperatorFromJson], name = OperatorNames.Reduce), @@ -79,14 +79,14 @@ import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, ) ) -class OperatorFromJson(val id: Long, - val input: Array[Long], - val output: Array[Long], - val cat: String, - val operatorName: String, - val executionPlatform: String = null - ) extends Serializable { - +class OperatorFromJson( + val id: Long, + val input: Array[Long], + val output: Array[Long], + val cat: String, + val operatorName: String, + val executionPlatform: String = null +) extends Serializable { // // Because case classes combined with inheritance were kind of difficult to change a field, // we use a workaround with json serialization -> modification -> deserialization. @@ -137,6 +137,7 @@ object OperatorFromJson { object OperatorNames { // Input final val TextFileInput = "textFileInput" + final val ParquetInput = "parquetInput" final val InputCollection = "inputCollection" final val Table = "table" final val JDBCRemoteInput = "jdbcRemoteInput" diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala index e4cd7b3bf..8693b5673 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/PlanFromJson.scala @@ -17,19 +17,11 @@ */ package org.apache.wayang.api.json.operatorfromjson -import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson.OperatorNames -import org.apache.wayang.api.json.operatorfromjson.binary.{CartesianOperatorFromJson, CoGroupOperatorFromJson, IntersectOperatorFromJson, JoinOperatorFromJson, UnionOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.other.KMeansFromJson -import org.apache.wayang.api.json.operatorfromjson.input.{InputCollectionFromJson, JDBCRemoteInputFromJson, TextFileInputFromJson} -import org.apache.wayang.api.json.operatorfromjson.loop.{DoWhileOperatorFromJson, ForeachOperatorFromJson, RepeatOperatorFromJson} -import org.apache.wayang.api.json.operatorfromjson.output.TextFileOutputFromJson -import org.apache.wayang.api.json.operatorfromjson.unary.{CountOperatorFromJson, DistinctOperatorFromJson, FilterOperatorFromJson, FlatMapOperatorFromJson, GroupByOpeartorFromJson, MapOperatorFromJson, MapPartitionsOperatorFromJson, ReduceByOperatorFromJson, SampleOperatorFromJson, SortOperatorFromJson} class PlanFromJson(val context: ContextFromJson, val operators: List[OperatorFromJson] diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala index cc3dd64af..4fb29f184 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/IntersectOperatorFromJson.scala @@ -29,4 +29,3 @@ case class IntersectOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala index b0b905ed4..df288afb8 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/JoinOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.binary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Join) case class JoinOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class JoinOperatorFromJson(override val id: Long, } object JoinOperatorFromJson { - case class Data(thisKeyUdf: String, thatKeyUdf: String) + case class Data(thisKeyUdf: String, thatKeyUdf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala index e4e138b6b..ce04b22f1 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/binary/UnionOperatorFromJson.scala @@ -29,5 +29,3 @@ case class UnionOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala new file mode 100644 index 000000000..d8c498433 --- /dev/null +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/input/ParquetInputFromJson.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.api.json.operatorfromjson.input + +import com.fasterxml.jackson.annotation.JsonTypeName +import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonTypeName(OperatorFromJson.OperatorNames.ParquetInput) +case class ParquetInputFromJson(override val id: Long, + override val input: Array[Long], + override val output: Array[Long], + override val cat: String, + override val operatorName: String, + val data: ParquetInputFromJson.Data, + override val executionPlatform: String = null) + extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { +} + +@JsonIgnoreProperties(ignoreUnknown = true) +object ParquetInputFromJson { + @JsonIgnoreProperties(ignoreUnknown = true) + case class Data(filename: String, projection: Array[String] = null) +} diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala index 316c69551..4ef204507 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/DistinctOperatorFromJson.scala @@ -29,4 +29,3 @@ case class DistinctOperatorFromJson(override val id: Long, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } - diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala index 93e49698d..e2fd764c9 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/FilterOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.unary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Filter) case class FilterOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class FilterOperatorFromJson(override val id: Long, } object FilterOperatorFromJson { - case class Data(udf: String) + case class Data(udf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala similarity index 90% rename from wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala rename to wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala index 53db57136..a230a7393 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOpeartorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/GroupByOperatorFromJson.scala @@ -22,17 +22,17 @@ import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson @JsonTypeName(OperatorFromJson.OperatorNames.GroupBy) -case class GroupByOpeartorFromJson(override val id: Long, +case class GroupByOperatorFromJson(override val id: Long, override val input: Array[Long], override val output: Array[Long], override val cat: String, override val operatorName: String, - val data: GroupByOpeartorFromJson.Data, + val data: GroupByOperatorFromJson.Data, override val executionPlatform: String = null) extends OperatorFromJson(id, input, output, cat, operatorName, executionPlatform) { } -object GroupByOpeartorFromJson { +object GroupByOperatorFromJson { case class Data(keyUdf: String) } diff --git a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala index 7ce91c49c..7c7c9d95a 100644 --- a/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala +++ b/wayang-api/wayang-api-json/src/main/scala/operatorfromjson/unary/SortOperatorFromJson.scala @@ -19,6 +19,7 @@ package org.apache.wayang.api.json.operatorfromjson.unary import com.fasterxml.jackson.annotation.JsonTypeName import org.apache.wayang.api.json.operatorfromjson.OperatorFromJson +import org.apache.wayang.api.util.NDimArray @JsonTypeName(OperatorFromJson.OperatorNames.Sort) case class SortOperatorFromJson(override val id: Long, @@ -32,5 +33,5 @@ case class SortOperatorFromJson(override val id: Long, } object SortOperatorFromJson { - case class Data(keyUdf: String) + case class Data(keyUdf: String, val inputType: scala.Option[NDimArray], val outputType: scala.Option[NDimArray]) } diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java index 4568a0745..534fa8e37 100644 --- a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java +++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/WrappedTransformationDescriptor.java @@ -21,7 +21,6 @@ import org.apache.wayang.core.function.TransformationDescriptor; import org.apache.wayang.core.types.BasicDataUnitType; import org.apache.wayang.api.python.executor.PythonWorkerManager; -import org.apache.wayang.core.function.FunctionDescriptor; import com.google.protobuf.ByteString; import java.util.ArrayList; diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties index 55c6ce726..a24bf4ad1 100644 --- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties +++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties @@ -17,4 +17,3 @@ wayang.api.python.worker = /var/www/html/python/src/pywy/execution/worker.py wayang.api.python.path = python3 wayang.api.python.env.path = /usr/local/lib/python3.8/dist-packages - diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala index 69e2a4b48..d1f9a118a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala @@ -31,7 +31,6 @@ import org.apache.wayang.core.api.WayangContext import org.apache.wayang.core.plan.wayangplan._ import org.apache.wayang.core.types.DataSetType -import java.util import scala.reflect.ClassTag /** @@ -70,7 +69,7 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) { * @param projection the projection, if any * @return [[DataQuantaBuilder]] for the file */ - def readParquet(url: String, projection: Array[String]): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = + def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record])) /** diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index f6df3e344..b13eb793c 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -133,7 +133,7 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job * @param projection the projection, if any * @return [[DataQuanta]] of [[Record]]s representing the file */ - def readParquet(url: String, projection: Array[String]): RecordDataQuanta = load(ParquetSource.create(url, projection)) + def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection)) /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala index c20e500bf..9f0c1d037 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/NDimArray.scala @@ -17,6 +17,8 @@ */ package org.apache.wayang.api.util +import org.apache.wayang.basic.data.Record + import scala.reflect.ClassTag class NDimArray(val origin: String, val depth: Integer) extends Serializable { @@ -42,6 +44,7 @@ class NDimArray(val origin: String, val depth: Integer) extends Serializable { case "Character" => classOf[Char] case "Byte" => classOf[Byte] case "Short" => classOf[Short] + case "Record" => classOf[Record] case _ => classOf[Object] } diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java index 00c76943a..ed92e813c 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/data/Record.java @@ -67,9 +67,7 @@ public int hashCode() { } @Override - public String toString() { - return "Record" + Arrays.toString(this.values); - } + public String toString() { return "Record" + Arrays.toString(this.values); } public Object getField(final int index) { return this.values[index];