From 443668ac27892907381e3048f914640710c52953 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Wed, 24 Feb 2021 21:58:36 -0500 Subject: [PATCH 01/16] every thing missing array support --- src/expression/abstract_expression.py | 60 +++++++++++---------- src/expression/comparison_expression.py | 55 ++++++++++--------- src/expression/constant_value_expression.py | 4 +- src/parser/evaql/evaql_parser.g4 | 1 + src/parser/parser_visitor/_expressions.py | 4 ++ test/expression/test_comparison.py | 42 +++++++++++++++ test/integration_tests/test_udf_executor.py | 7 ++- test/parser/test_parser_visitor.py | 24 +++++++-- test/util.py | 2 +- 9 files changed, 136 insertions(+), 63 deletions(-) diff --git a/src/expression/abstract_expression.py b/src/expression/abstract_expression.py index 2e6594f654..00112567d1 100644 --- a/src/expression/abstract_expression.py +++ b/src/expression/abstract_expression.py @@ -13,52 +13,54 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod -from enum import IntEnum, unique +from enum import IntEnum, unique, auto from src.utils import generic_utils @unique class ExpressionType(IntEnum): - INVALID = 0, - CONSTANT_VALUE = 1, - TUPLE_VALUE = 2, + INVALID = auto() + CONSTANT_VALUE = auto() + TUPLE_VALUE = auto() # Compare operators - COMPARE_EQUAL = 3, - COMPARE_GREATER = 4, - COMPARE_LESSER = 5, - COMPARE_GEQ = 6, - COMPARE_LEQ = 7, - COMPARE_NEQ = 8, + COMPARE_EQUAL = auto() + COMPARE_GREATER = auto() + COMPARE_LESSER = auto() + COMPARE_GEQ = auto() + COMPARE_LEQ = auto() + COMPARE_NEQ = auto() + COMPARE_CONTAINS = auto() + COMPARE_IS_CONTAINED = auto() # Logical operators - LOGICAL_AND = 9, - LOGICAL_OR = 10, - LOGICAL_NOT = 11, + LOGICAL_AND = auto() + LOGICAL_OR = auto() + LOGICAL_NOT = auto() # Arithmetic operators - ARITHMETIC_ADD = 12, - ARITHMETIC_SUBTRACT = 13, - ARITHMETIC_MULTIPLY = 14, - ARITHMETIC_DIVIDE = 15, + ARITHMETIC_ADD = auto() + ARITHMETIC_SUBTRACT = auto() + ARITHMETIC_MULTIPLY = auto() + ARITHMETIC_DIVIDE = auto() - FUNCTION_EXPRESSION = 16, + FUNCTION_EXPRESSION = auto() - AGGREGATION_COUNT = 17, - AGGREGATION_SUM = 18, - AGGREGATION_MIN = 19, - AGGREGATION_MAX = 20, - AGGREGATION_AVG = 21, + AGGREGATION_COUNT = auto() + AGGREGATION_SUM = auto() + AGGREGATION_MIN = auto() + AGGREGATION_MAX = auto() + AGGREGATION_AVG = auto() - CASE = 22, + CASE = auto() # add other types @unique class ExpressionReturnType(IntEnum): - INVALID = 0, - BOOLEAN = 1, - INTEGER = 2, - VARCHAR = 3, - FLOAT = 4, + INVALID = auto() + BOOLEAN = auto() + INTEGER = auto() + VARCHAR = auto() + FLOAT = auto() # add others diff --git a/src/expression/comparison_expression.py b/src/expression/comparison_expression.py index 378f9d0858..0ea449a770 100644 --- a/src/expression/comparison_expression.py +++ b/src/expression/comparison_expression.py @@ -12,12 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pandas as pd +import numpy as np + from src.expression.abstract_expression import AbstractExpression, \ ExpressionType, \ ExpressionReturnType from src.models.storage.batch import Batch -import pandas as pd -import numpy as np class ComparisonExpression(AbstractExpression): @@ -32,41 +33,43 @@ def __init__(self, exp_type: ExpressionType, left: AbstractExpression, children=children) def evaluate(self, *args): - # evaluate always return batch - left_values = self.get_child(0).evaluate(*args).frames - right_values = self.get_child(1).evaluate(*args).frames + # cast in to numpy array + lvalues = self.get_child(0).evaluate(*args).frames.values + rvalues = self.get_child(1).evaluate(*args).frames.values - if len(left_values) != len(right_values): - if len(left_values) == 1: - left_values = pd.DataFrame(np.repeat(left_values.values, - len(right_values), - axis=0)) - elif len(right_values) == 1: - right_values = pd.DataFrame(np.repeat(right_values.values, - len(left_values), - axis=0)) + if len(lvalues) != len(rvalues): + if len(lvalues) == 1: + lvalues = np.repeat(lvalues, len(rvalues), axis=0) + elif len(rvalues) == 1: + rvalues = np.repeat(rvalues, len(lvalues), axis=0) else: raise Exception( "Left and Right batch does not have equal elements") if self.etype == ExpressionType.COMPARE_EQUAL: - return Batch(pd.DataFrame( - left_values.values == right_values.values)) + return Batch(pd.DataFrame(lvalues == rvalues)) elif self.etype == ExpressionType.COMPARE_GREATER: - return Batch(pd.DataFrame( - left_values.values > right_values.values)) + return Batch(pd.DataFrame(lvalues > rvalues)) elif self.etype == ExpressionType.COMPARE_LESSER: - return Batch(pd.DataFrame( - left_values.values < right_values.values)) + return Batch(pd.DataFrame(lvalues < rvalues)) elif self.etype == ExpressionType.COMPARE_GEQ: - return Batch(pd.DataFrame( - left_values.values >= right_values.values)) + return Batch(pd.DataFrame(lvalues >= rvalues)) elif self.etype == ExpressionType.COMPARE_LEQ: - return Batch(pd.DataFrame( - left_values.values <= right_values.values)) + return Batch(pd.DataFrame(lvalues <= rvalues)) elif self.etype == ExpressionType.COMPARE_NEQ: - return Batch(pd.DataFrame( - left_values.values != right_values.values)) + return Batch(pd.DataFrame(lvalues != rvalues)) + elif self.etype == ExpressionType.COMPARE_CONTAINS: + res = [[all(x in p for x in q) \ + for p, q in zip(left, right)] \ + for left, right in zip(lvalues, rvalues)] + return Batch(pd.DataFrame(res)) + elif self.etype == ExpressionType.COMPARE_IS_CONTAINED: + res = [[all(x in q for x in p) \ + for p, q in zip(left, right)] \ + for left, right in zip(lvalues, rvalues)] + return Batch(pd.DataFrame(res)) + else: + raise NotImplementedError def __eq__(self, other): is_subtree_equal = super().__eq__(other) diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index e28bf58186..6d72013769 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -25,10 +25,12 @@ class ConstantValueExpression(AbstractExpression): # return type not set, handle that based on value def __init__(self, value): super().__init__(ExpressionType.CONSTANT_VALUE) + print('In constant:') + print(value) self._value = value def evaluate(self, *args): - return Batch(pd.DataFrame([self._value])) + return Batch(pd.DataFrame({0: [self._value]})) @property def value(self): diff --git a/src/parser/evaql/evaql_parser.g4 b/src/parser/evaql/evaql_parser.g4 index 4e9f2b1e82..f45cc95293 100644 --- a/src/parser/evaql/evaql_parser.g4 +++ b/src/parser/evaql/evaql_parser.g4 @@ -503,6 +503,7 @@ unaryOperator comparisonOperator : '=' | '>' | '<' | '<' '=' | '>' '=' | '<' '>' | '!' '=' | '<' '=' '>' + | '@' '>' | '<' '@' ; logicalOperator diff --git a/src/parser/parser_visitor/_expressions.py b/src/parser/parser_visitor/_expressions.py index c1ad1fdbd2..12174c6b78 100644 --- a/src/parser/parser_visitor/_expressions.py +++ b/src/parser/parser_visitor/_expressions.py @@ -82,6 +82,10 @@ def visitComparisonOperator( return ExpressionType.COMPARE_LEQ elif op == '!=': return ExpressionType.COMPARE_NEQ + elif op == '@>': + return ExpressionType.COMPARE_CONTAINS + elif op == '<@': + return ExpressionType.COMPARE_IS_CONTAINED else: return ExpressionType.INVALID diff --git a/test/expression/test_comparison.py b/test/expression/test_comparison.py index e6f3964a11..101ebfa174 100644 --- a/test/expression/test_comparison.py +++ b/test/expression/test_comparison.py @@ -111,3 +111,45 @@ def test_comparison_compare_neq(self): ) self.assertEqual([True], cmpr_exp.evaluate(None).frames[0].tolist()) + + def test_comparison_compare_contains(self): + const_exp1 = ConstantValueExpression([1,2]) + const_exp2 = ConstantValueExpression([1,5]) + const_exp3 = ConstantValueExpression([1,2,3,4]) + + cmpr_exp1 = ComparisonExpression( + ExpressionType.COMPARE_CONTAINS, + const_exp3, + const_exp1 + ) + + self.assertEqual([True], cmpr_exp1.evaluate(None).frames[0].tolist()) + + cmpr_exp2 = ComparisonExpression( + ExpressionType.COMPARE_CONTAINS, + const_exp3, + const_exp2 + ) + + self.assertEqual([False], cmpr_exp2.evaluate(None).frames[0].tolist()) + + def test_comparison_compare_is_contained(self): + const_exp1 = ConstantValueExpression([1,2]) + const_exp2 = ConstantValueExpression([1,5]) + const_exp3 = ConstantValueExpression([1,2,3,4]) + + cmpr_exp1 = ComparisonExpression( + ExpressionType.COMPARE_IS_CONTAINED, + const_exp1, + const_exp3 + ) + + self.assertEqual([True], cmpr_exp1.evaluate(None).frames[0].tolist()) + + cmpr_exp2 = ComparisonExpression( + ExpressionType.COMPARE_IS_CONTAINED, + const_exp2, + const_exp3 + ) + + self.assertEqual([False], cmpr_exp2.evaluate(None).frames[0].tolist()) diff --git a/test/integration_tests/test_udf_executor.py b/test/integration_tests/test_udf_executor.py index aba7c087cc..28ac41611c 100644 --- a/test/integration_tests/test_udf_executor.py +++ b/test/integration_tests/test_udf_executor.py @@ -50,7 +50,7 @@ def test_should_load_and_select_using_udf_video_in_table(self): actual_batch = perform_query(select_query) actual_batch.sort() labels = DummyObjectDetector().labels - expected = [{'id': i, 'label': labels[1 + i % 2]} + expected = [{'id': i, 'label': [labels[1 + i % 2]]} for i in range(NUM_FRAMES)] expected_batch = Batch(frames=pd.DataFrame(expected)) self.assertEqual(actual_batch, expected_batch) @@ -68,14 +68,17 @@ def test_should_load_and_select_using_udf_video(self): perform_query(create_udf_query) select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ - WHERE DummyObjectDetector(data).label = 'person';" + WHERE DummyObjectDetector(data).label @> ['person'];" actual_batch = perform_query(select_query) actual_batch.sort() expected = [{'id': i * 2, 'label': 'person'} for i in range(NUM_FRAMES // 2)] expected_batch = Batch(frames=pd.DataFrame(expected)) + print(expected_batch) + print(actual_batch) self.assertEqual(actual_batch, expected_batch) + return nested_select_query = """SELECT id, data FROM (SELECT id, data, DummyObjectDetector(data) FROM MyVideo WHERE id >= 2 diff --git a/test/parser/test_parser_visitor.py b/test/parser/test_parser_visitor.py index 3784c8ed86..f2affac58e 100644 --- a/test/parser/test_parser_visitor.py +++ b/test/parser/test_parser_visitor.py @@ -95,22 +95,38 @@ def test_comparison_operator(self): self.assertEqual( visitor.visitComparisonOperator(ctx), - ExpressionType.INVALID) + ExpressionType.INVALID + ) ctx.getText.return_value = '=' self.assertEqual( visitor.visitComparisonOperator(ctx), - ExpressionType.COMPARE_EQUAL) + ExpressionType.COMPARE_EQUAL + ) ctx.getText.return_value = '<' self.assertEqual( visitor.visitComparisonOperator(ctx), - ExpressionType.COMPARE_LESSER) + ExpressionType.COMPARE_LESSER + ) ctx.getText.return_value = '>' self.assertEqual( visitor.visitComparisonOperator(ctx), - ExpressionType.COMPARE_GREATER) + ExpressionType.COMPARE_GREATER + ) + + ctx.getText.return_value = '@>' + self.assertEqual( + visitor.visitComparisonOperator(ctx), + ExpressionType.COMPARE_CONTAINS + ) + + ctx.getText.return_value = '<@' + self.assertEqual( + visitor.visitComparisonOperator(ctx), + ExpressionType.COMPARE_IS_CONTAINED + ) # To be fixed # def test_visit_full_column_name_none(self): diff --git a/test/util.py b/test/util.py index f988fd5aeb..f0a406ac0e 100644 --- a/test/util.py +++ b/test/util.py @@ -125,4 +125,4 @@ def classify_one(self, frames: np.ndarray): # odd are labeled bicycle and even person i = int(frames[0][0][0][0] * 25) - 1 label = self.labels[i % 2 + 1] - return label + return [label] From e63f20edc065eaf0a5a93aae92e58587d384e30d Mon Sep 17 00:00:00 2001 From: xzdandy Date: Wed, 24 Feb 2021 22:00:47 -0500 Subject: [PATCH 02/16] enable both test --- test/integration_tests/test_udf_executor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/integration_tests/test_udf_executor.py b/test/integration_tests/test_udf_executor.py index 28ac41611c..98ae670daf 100644 --- a/test/integration_tests/test_udf_executor.py +++ b/test/integration_tests/test_udf_executor.py @@ -78,12 +78,11 @@ def test_should_load_and_select_using_udf_video(self): print(actual_batch) self.assertEqual(actual_batch, expected_batch) - return nested_select_query = """SELECT id, data FROM (SELECT id, data, DummyObjectDetector(data) FROM MyVideo WHERE id >= 2 ) - WHERE label = 'person'; + WHERE ['person'] <@ label; """ actual_batch = perform_query(nested_select_query) actual_batch.sort() From 8372644a38ef21a9824709fcd6fce0f0205519cc Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 25 Feb 2021 01:13:50 -0500 Subject: [PATCH 03/16] array support added in parser --- src/parser/evaql/evaql_lexer.g4 | 18 ++++++++++-------- src/parser/evaql/evaql_parser.g4 | 6 ++++++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/parser/evaql/evaql_lexer.g4 b/src/parser/evaql/evaql_lexer.g4 index 52902214f4..bb1660222e 100644 --- a/src/parser/evaql/evaql_lexer.g4 +++ b/src/parser/evaql/evaql_lexer.g4 @@ -9,8 +9,8 @@ SPACE: [ \t\r\n]+ -> channel(HIDDEN); SPEC_EVAQL_COMMENT: '/*!' .+? '*/' -> channel(EVAQLCOMMENT); COMMENT_INPUT: '/*' .*? '*/' -> channel(HIDDEN); LINE_COMMENT: ( - ('-- ' | '#') ~[\r\n]* ('\r'? '\n' | EOF) - | '--' ('\r'? '\n' | EOF) + ('-- ' | '#') ~[\r\n]* ('\r'? '\n' | EOF) + | '--' ('\r'? '\n' | EOF) ) -> channel(HIDDEN); // Keywords @@ -178,6 +178,8 @@ BIT_XOR_OP: '^'; DOT: '.'; LR_BRACKET: '('; RR_BRACKET: ')'; +LR_SQ_BRACKET: '['; +RR_SQ_BRACKET: ']'; COMMA: ','; SEMI: ';'; AT_SIGN: '@'; @@ -215,23 +217,23 @@ ID: ID_LITERAL; // DOUBLE_QUOTE_ID: '"' ~'"'+ '"'; REVERSE_QUOTE_ID: '`' ~'`'+ '`'; STRING_USER_NAME: ( - SQUOTA_STRING | DQUOTA_STRING + SQUOTA_STRING | DQUOTA_STRING | BQUOTA_STRING | ID_LITERAL - ) '@' + ) '@' ( - SQUOTA_STRING | DQUOTA_STRING + SQUOTA_STRING | DQUOTA_STRING | BQUOTA_STRING | ID_LITERAL ); LOCAL_ID: '@' ( - [A-Z0-9._$]+ + [A-Z0-9._$]+ | SQUOTA_STRING | DQUOTA_STRING | BQUOTA_STRING ); -GLOBAL_ID: '@' '@' +GLOBAL_ID: '@' '@' ( - [A-Z0-9._$]+ + [A-Z0-9._$]+ | BQUOTA_STRING ); diff --git a/src/parser/evaql/evaql_parser.g4 b/src/parser/evaql/evaql_parser.g4 index f45cc95293..5b9741d579 100644 --- a/src/parser/evaql/evaql_parser.g4 +++ b/src/parser/evaql/evaql_parser.g4 @@ -360,12 +360,18 @@ nullNotnull : NOT? (NULL_LITERAL | NULL_SPEC_LITERAL) ; +arrayLiteral + : LR_SQ_BRACKET constant (',' constant)* RR_SQ_BRACKET + | LR_SQ_BRACKET RR_SQ_BRACKET + ; + constant : stringLiteral | decimalLiteral | '-' decimalLiteral | booleanLiteral | REAL_LITERAL | NOT? nullLiteral=(NULL_LITERAL | NULL_SPEC_LITERAL) + | arrayLiteral ; From 600207e51fbe972f54df542985e57910f0de0d5d Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 25 Feb 2021 01:32:49 -0500 Subject: [PATCH 04/16] insert support + array primitive datatype --- src/executor/insert_executor.py | 41 ++++----------------- src/expression/constant_value_expression.py | 2 +- src/models/storage/batch.py | 4 +- src/parser/parser_visitor/_expressions.py | 8 ++++ 4 files changed, 18 insertions(+), 37 deletions(-) diff --git a/src/executor/insert_executor.py b/src/executor/insert_executor.py index f05a11e485..5503e08bef 100644 --- a/src/executor/insert_executor.py +++ b/src/executor/insert_executor.py @@ -13,15 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from numpy import ndarray - from src.catalog.catalog_manager import CatalogManager -from src.catalog.column_type import ColumnType from src.planner.insert_plan import InsertPlan from src.executor.abstract_executor import AbstractExecutor from src.storage.storage_engine import StorageEngine -from src.utils.logging_manager import LoggingLevel -from src.utils.logging_manager import LoggingManager +from src.models.storage.batch import Batch class InsertExecutor(AbstractExecutor): @@ -39,34 +35,13 @@ def exec(self): Right now we assume there are no missing values """ table_id = self.node.video_id - col_id_to_val = {} + data_tuple = [] for col, val in zip(self.node.column_list, self.node.value_list): - col_id_to_val[col.col_metadata_id] = val.evaluate() + val = val.evaluate() + val.frames.columns = [col.col_name] + data_tuple.append(val) + batch = Batch.merge_column_wise(data_tuple) metadata = CatalogManager().get_metadata(table_id) - - column_list = metadata.schema.column_list - - data_tuple = [] - for column in column_list: - col_id, col_type = column.id, column.type - if col_id in col_id_to_val.keys(): - val = col_id_to_val[col_id] - try: - if col_type == ColumnType.INTEGER: - data_tuple.append(int(val)) - elif col_type == ColumnType.FLOAT: - data_tuple.append(float(val)) - elif col_type == ColumnType.BOOLEAN: - data_tuple.append(bool(val)) - elif col_type == ColumnType.TEXT: - data_tuple.append(str(val)) - elif col_type == ColumnType.NDARRAY: - data_tuple.append(ndarray(val)) - except Exception as e: - LoggingManager().log( - f'Insert Executor failed bcz of invalid value {e}', - LoggingLevel.ERROR) - return - - StorageEngine.write_row(metadata, [data_tuple]) + # verify value types are consistent + StorageEngine.write(metadata, batch) diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index 6d72013769..84120643f9 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -30,7 +30,7 @@ def __init__(self, value): self._value = value def evaluate(self, *args): - return Batch(pd.DataFrame({0: [self._value]})) + return Batch(pd.DataFrame([[self._value]])) @property def value(self): diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 66929c13d9..56f4a87006 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -51,9 +51,7 @@ def __init__(self, identifier_column='id'): super().__init__() # store the batch with columns sorted - if isinstance(frames, DataFrame): - self._frames = frames[sorted(frames.columns)] - else: + if not isinstance(frames, DataFrame): LoggingManager().log('Batch constructor not properly called!', LoggingLevel.DEBUG) raise ValueError('Batch constructor not properly called. \ diff --git a/src/parser/parser_visitor/_expressions.py b/src/parser/parser_visitor/_expressions.py index 12174c6b78..309ba326c0 100644 --- a/src/parser/parser_visitor/_expressions.py +++ b/src/parser/parser_visitor/_expressions.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json +import numpy as np + from src.parser.evaql.evaql_parserVisitor import evaql_parserVisitor from src.expression.abstract_expression import ExpressionType from src.expression.comparison_expression import ComparisonExpression @@ -36,6 +39,11 @@ def visitStringLiteral(self, ctx: evaql_parser.StringLiteralContext): # todo handle other types return self.visitChildren(ctx) + def visitArrayLiteral(self, ctx: evaql_parser.ArrayLiteralContext): + # change the dtype when we add support for np.float + return ConstantValueExpression(np.array(json.loads(ctx.getText()), + dtype=np.uint8)) + def visitConstant(self, ctx: evaql_parser.ConstantContext): if ctx.REAL_LITERAL() is not None: return ConstantValueExpression(float(ctx.getText())) From 084099d110e0f710110742c1069854647cacd96f Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 25 Feb 2021 02:38:32 -0500 Subject: [PATCH 05/16] added insert test-cases --- src/models/storage/batch.py | 4 +- .../integration_tests/test_insert_executor.py | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 test/integration_tests/test_insert_executor.py diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 56f4a87006..3c1d1da10c 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -51,7 +51,9 @@ def __init__(self, identifier_column='id'): super().__init__() # store the batch with columns sorted - if not isinstance(frames, DataFrame): + if isinstance(frames, DataFrame): + self._frames = frames + else: LoggingManager().log('Batch constructor not properly called!', LoggingLevel.DEBUG) raise ValueError('Batch constructor not properly called. \ diff --git a/test/integration_tests/test_insert_executor.py b/test/integration_tests/test_insert_executor.py new file mode 100644 index 0000000000..13d9a60ab2 --- /dev/null +++ b/test/integration_tests/test_insert_executor.py @@ -0,0 +1,60 @@ +# coding=utf-8 +# Copyright 2018-2020 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import os +import numpy as np + +from src.catalog.catalog_manager import CatalogManager +from test.util import create_sample_video, perform_query + + +class InsertExecutorTest(unittest.TestCase): + + def setUp(self): + # reset the catalog manager before running each test + CatalogManager().reset() + create_sample_video() + + def tearDown(self): + os.remove('dummy.avi') + + # integration test + def test_should_load_video_in_table(self): + query = """LOAD DATA INFILE 'dummy.avi' INTO MyVideo;""" + perform_query(query) + + insert_query = """ INSERT INTO MyVideo (id, data) VALUES (40, + [[[40, 40, 40] , [40, 40, 40]], + [[40, 40, 40], [40, 40, 40]]]);""" + perform_query(insert_query) + + insert_query_2 = """ INSERT INTO MyVideo (id, data) VALUES (41, + [[[41, 41, 41] , [41, 41, 41]], + [[41, 41, 41], [41, 41, 41]]]);""" + perform_query(insert_query_2) + + query = 'SELECT id, data FROM MyVideo WHERE id = 40;' + batch = perform_query(query) + self.assertIsNone(np.testing.assert_array_equal( + batch.frames['data'][0], + np.array([[[40, 40, 40], [40, 40, 40]], + [[40, 40, 40], [40, 40, 40]]]))) + + query = 'SELECT id, data FROM MyVideo WHERE id = 41;' + batch = perform_query(query) + self.assertIsNone(np.testing.assert_array_equal( + batch.frames['data'][0], + np.array([[[41, 41, 41], [41, 41, 41]], + [[41, 41, 41], [41, 41, 41]]]))) From 2ed5868e56a1fb735dd071a012b662ce83022566 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Thu, 25 Feb 2021 03:08:00 -0500 Subject: [PATCH 06/16] fix some errors. Array of string not working. --- src/expression/constant_value_expression.py | 4 +--- src/models/storage/batch.py | 1 + src/parser/parser_visitor/_expressions.py | 4 +++- test/integration_tests/test_udf_executor.py | 5 ++++- test/parser/test_parser_visitor.py | 14 ++++++++++++++ test/util.py | 1 + 6 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index 84120643f9..bc8c828c3a 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -25,12 +25,10 @@ class ConstantValueExpression(AbstractExpression): # return type not set, handle that based on value def __init__(self, value): super().__init__(ExpressionType.CONSTANT_VALUE) - print('In constant:') - print(value) self._value = value def evaluate(self, *args): - return Batch(pd.DataFrame([[self._value]])) + return Batch(pd.DataFrame({0: [self._value]})) @property def value(self): diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 56f4a87006..1e91143ef3 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -56,6 +56,7 @@ def __init__(self, LoggingLevel.DEBUG) raise ValueError('Batch constructor not properly called. \ Expected pandas.DataFrame') + self._frames = frames[sorted(frames.columns)] self._batch_size = len(frames) self._identifier_column = identifier_column diff --git a/src/parser/parser_visitor/_expressions.py b/src/parser/parser_visitor/_expressions.py index 309ba326c0..88ff9d7513 100644 --- a/src/parser/parser_visitor/_expressions.py +++ b/src/parser/parser_visitor/_expressions.py @@ -41,8 +41,9 @@ def visitStringLiteral(self, ctx: evaql_parser.StringLiteralContext): def visitArrayLiteral(self, ctx: evaql_parser.ArrayLiteralContext): # change the dtype when we add support for np.float - return ConstantValueExpression(np.array(json.loads(ctx.getText()), + res = ConstantValueExpression(np.array(json.loads(ctx.getText()), dtype=np.uint8)) + return res def visitConstant(self, ctx: evaql_parser.ConstantContext): if ctx.REAL_LITERAL() is not None: @@ -50,6 +51,7 @@ def visitConstant(self, ctx: evaql_parser.ConstantContext): if ctx.decimalLiteral() is not None: return ConstantValueExpression(self.visit(ctx.decimalLiteral())) + print(ctx.getText()) return self.visitChildren(ctx) def visitLogicalExpression( diff --git a/test/integration_tests/test_udf_executor.py b/test/integration_tests/test_udf_executor.py index 98ae670daf..3e3fd7cbb6 100644 --- a/test/integration_tests/test_udf_executor.py +++ b/test/integration_tests/test_udf_executor.py @@ -34,6 +34,8 @@ def tearDown(self): os.remove('dummy.avi') # integration test + + @unittest.skip('Too slow when batch size is small.') def test_should_load_and_select_using_udf_video_in_table(self): load_query = """LOAD DATA INFILE 'dummy.avi' INTO MyVideo;""" perform_query(load_query) @@ -68,7 +70,7 @@ def test_should_load_and_select_using_udf_video(self): perform_query(create_udf_query) select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ - WHERE DummyObjectDetector(data).label @> ['person'];" + WHERE DummyObjectDetector(data).label = ['person'];" actual_batch = perform_query(select_query) actual_batch.sort() expected = [{'id': i * 2, 'label': 'person'} @@ -78,6 +80,7 @@ def test_should_load_and_select_using_udf_video(self): print(actual_batch) self.assertEqual(actual_batch, expected_batch) + return nested_select_query = """SELECT id, data FROM (SELECT id, data, DummyObjectDetector(data) FROM MyVideo WHERE id >= 2 diff --git a/test/parser/test_parser_visitor.py b/test/parser/test_parser_visitor.py index f2affac58e..08c0bf092d 100644 --- a/test/parser/test_parser_visitor.py +++ b/test/parser/test_parser_visitor.py @@ -15,6 +15,7 @@ import unittest import pandas as pd +import numpy as np from unittest import mock from unittest.mock import MagicMock, call @@ -203,6 +204,19 @@ def test_visit_constant(self): expected.evaluate(), Batch(pd.DataFrame([float(ctx.getText())]))) + def test_visit_array_literal(self): + ''' Testing when array literal + Function: visitArrayLiteral + ''' + ctx = MagicMock() + visitor = ParserVisitor() + ctx.getText.return_value = '[1,2,3,4]' + expected = visitor.visitArrayLiteral(ctx) + self.assertEqual( + expected.evaluate(), + Batch(pd.DataFrame({0: [np.array([1,2,3,4])]})) + ) + def test_visit_query_specification_base_exception(self): ''' Testing Base Exception error handling Function: visitQuerySpecification diff --git a/test/util.py b/test/util.py index f0a406ac0e..65b09c53dd 100644 --- a/test/util.py +++ b/test/util.py @@ -94,6 +94,7 @@ def create_dummy_batches(num_frames=NUM_FRAMES, def perform_query(query): stmt = Parser().parse(query)[0] + print(stmt) l_plan = StatementToPlanConvertor().visit(stmt) p_plan = PlanGenerator().build(l_plan) return PlanExecutor(p_plan).execute_plan() From 9a5a1ddb248b8e8dcbac375ecb1b1afcb1a4beb0 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Thu, 25 Feb 2021 03:12:36 -0500 Subject: [PATCH 07/16] remove print --- test/util.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/util.py b/test/util.py index 65b09c53dd..f0a406ac0e 100644 --- a/test/util.py +++ b/test/util.py @@ -94,7 +94,6 @@ def create_dummy_batches(num_frames=NUM_FRAMES, def perform_query(query): stmt = Parser().parse(query)[0] - print(stmt) l_plan = StatementToPlanConvertor().visit(stmt) p_plan = PlanGenerator().build(l_plan) return PlanExecutor(p_plan).execute_plan() From 36055fcdefbcd80e54a0517bfde00ca99cc45249 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Thu, 25 Feb 2021 03:14:18 -0500 Subject: [PATCH 08/16] remove double fix --- src/models/storage/batch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 9874971c5c..3c1d1da10c 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -58,7 +58,6 @@ def __init__(self, LoggingLevel.DEBUG) raise ValueError('Batch constructor not properly called. \ Expected pandas.DataFrame') - self._frames = frames[sorted(frames.columns)] self._batch_size = len(frames) self._identifier_column = identifier_column From c01f585221f18b356cba20b6470dbe56cae938e4 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Sat, 27 Feb 2021 22:39:37 -0500 Subject: [PATCH 09/16] add array_type --- eva.yml | 2 +- src/catalog/catalog_manager.py | 10 +-- src/catalog/column_type.py | 51 ++++++++++++- src/catalog/models/df_column.py | 12 +++- src/catalog/schema_utils.py | 6 +- src/models/storage/batch.py | 2 +- src/optimizer/optimizer_utils.py | 45 ++++-------- src/parser/create_statement.py | 18 +++-- src/parser/evaql/evaql_lexer.g4 | 12 ++++ src/parser/evaql/evaql_parser.g4 | 9 ++- .../parser_visitor/_create_statements.py | 72 ++++++++++++++----- src/parser/types.py | 12 ---- 12 files changed, 170 insertions(+), 81 deletions(-) diff --git a/eva.yml b/eva.yml index 1dc397d227..13fd495025 100644 --- a/eva.yml +++ b/eva.yml @@ -1,6 +1,6 @@ core: location: "eva_datasets" - sqlalchemy_database_uri: 'mysql+pymysql://root@localhost/eva_catalog' + sqlalchemy_database_uri: 'mysql+pymysql://root:root@localhost/eva_catalog' application: "eva" executor: diff --git a/src/catalog/catalog_manager.py b/src/catalog/catalog_manager.py index 2b713301ef..a26bc5cba5 100644 --- a/src/catalog/catalog_manager.py +++ b/src/catalog/catalog_manager.py @@ -15,7 +15,7 @@ from typing import List, Tuple -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.models.base_model import init_db, drop_db from src.catalog.models.df_column import DataFrameColumn from src.catalog.models.df_metadata import DataFrameMetadata @@ -195,8 +195,9 @@ def get_column_ids(self, table_metadata_id: int) -> List[int]: return col_ids def create_column_metadata( - self, column_name: str, data_type: ColumnType, - dimensions: List[int]): + self, column_name: str, data_type: ColumnType, array_type: NdArrayType, + dimensions: List[int] + ) -> DataFrameColumn: """Create a dataframe column object this column. This function won't commit this object in the catalog database. If you want to commit it into catalog table call create_metadata with @@ -205,9 +206,10 @@ def create_column_metadata( Arguments: column_name {str} -- column name to be created data_type {ColumnType} -- type of column created + array_type {NdArrayType} -- type of ndarray dimensions {List[int]} -- dimensions of the column created """ - return DataFrameColumn(column_name, data_type, + return DataFrameColumn(column_name, data_type, array_type=array_type, array_dimensions=dimensions) def get_dataset_metadata(self, database_name: str, dataset_name: str) -> \ diff --git a/src/catalog/column_type.py b/src/catalog/column_type.py index c9df5528c7..96322dd1f7 100644 --- a/src/catalog/column_type.py +++ b/src/catalog/column_type.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from enum import Enum - +from enum import Enum, auto class ColumnType(Enum): BOOLEAN = 1 @@ -21,3 +20,51 @@ class ColumnType(Enum): FLOAT = 3 TEXT = 4 NDARRAY = 5 + +class NdArrayType(Enum): + INT8 = auto() + UINT8 = auto() + INT16 = auto() + INT32 = auto() + INT64 = auto() + UNICODE = auto() + BOOL = auto() + FLOAT32 = auto() + FLOAT64 = auto() + DECIMAL = auto() + STR = auto() + DATETIME = auto() + + @classmethod + def to_numpy_type(cls, t): + import numpy as np + from decimal import Decimal + + if t == cls.INT8: + np_type = np.int8 + elif t is NdArrayType.UINT8: + np_type = np.uint8 + elif t == cls.INT16: + np_type = np.int16 + elif t == cls.INT32: + np_type = np.int32 + elif t == cls.INT64: + np_type = np.int64 + elif t == cls.UNICODE: + np_type = np.unicode_ + elif t == cls.BOOL: + np_type = np.bool_ + elif t == cls.FLOAT32: + np_type = np.float32 + elif t == cls.FLOAT64: + np_type = np.float64 + elif t == cls.DECIMAL: + np_type = Decimal + elif t == cls.STR: + np_type = np.str_ + elif t == cls.DATETIME: + np_type = np.datetime64 + else: + raise ValueError('Can not auto convert %s to numpy type' % t) + + return np_type diff --git a/src/catalog/models/df_column.py b/src/catalog/models/df_column.py index 8f943e9af9..e1134fa42b 100644 --- a/src/catalog/models/df_column.py +++ b/src/catalog/models/df_column.py @@ -20,7 +20,7 @@ from sqlalchemy.types import Enum from ast import literal_eval -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.models.base_model import BaseModel @@ -30,6 +30,7 @@ class DataFrameColumn(BaseModel): _name = Column('name', String(100)) _type = Column('type', Enum(ColumnType), default=Enum) _is_nullable = Column('is_nullable', Boolean, default=False) + _array_type = Column('array_type', Enum(NdArrayType), nullable=True) _array_dimensions = Column('array_dimensions', String(100)) _metadata_id = Column('metadata_id', Integer, ForeignKey('df_metadata.id')) @@ -44,11 +45,13 @@ def __init__(self, name: str, type: ColumnType, is_nullable: bool = False, + array_type: NdArrayType = None, array_dimensions: List[int] = [], metadata_id: int = None): self._name = name self._type = type self._is_nullable = is_nullable + self._array_type = array_type self._array_dimensions = str(array_dimensions) self._metadata_id = metadata_id @@ -68,6 +71,10 @@ def type(self): def is_nullable(self): return self._is_nullable + @property + def array_type(self): + return self._array_type + @property def array_dimensions(self): return literal_eval(self._array_dimensions) @@ -89,7 +96,7 @@ def __str__(self): self._type.name, self._is_nullable) - column_str += "[" + column_str += "%s[" % self.array_type column_str += ', '.join(['%d'] * len(self.array_dimensions)) \ % tuple(self.array_dimensions) column_str += "])" @@ -100,6 +107,7 @@ def __eq__(self, other): return self.id == other.id and \ self.metadata_id == other.metadata_id and \ self.is_nullable == other.is_nullable and \ + self.array_type == other.array_type and \ self.array_dimensions == other.array_dimensions and \ self.name == other.name and \ self.type == other.type diff --git a/src/catalog/schema_utils.py b/src/catalog/schema_utils.py index bfb89d7593..8d6a467065 100644 --- a/src/catalog/schema_utils.py +++ b/src/catalog/schema_utils.py @@ -19,7 +19,7 @@ from petastorm.unischema import UnischemaField from pyspark.sql.types import IntegerType, FloatType, StringType -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.utils.logging_manager import LoggingLevel from src.utils.logging_manager import LoggingManager @@ -32,6 +32,7 @@ def get_petastorm_column(df_column): column_type = df_column.type column_name = df_column.name column_is_nullable = df_column.is_nullable + column_array_type = df_column.array_type column_array_dimensions = df_column.array_dimensions # Reference: @@ -58,8 +59,9 @@ def get_petastorm_column(df_column): ScalarCodec(StringType()), column_is_nullable) elif column_type == ColumnType.NDARRAY: + np_type = NdArrayType.to_numpy_type(column_array_type) petastorm_column = UnischemaField(column_name, - np.uint8, + np_type, column_array_dimensions, NdarrayCodec(), column_is_nullable) diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 3c1d1da10c..66929c13d9 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -52,7 +52,7 @@ def __init__(self, super().__init__() # store the batch with columns sorted if isinstance(frames, DataFrame): - self._frames = frames + self._frames = frames[sorted(frames.columns)] else: LoggingManager().log('Batch constructor not properly called!', LoggingLevel.DEBUG) diff --git a/src/optimizer/optimizer_utils.py b/src/optimizer/optimizer_utils.py index d7d30fbb0a..fdbb9c126c 100644 --- a/src/optimizer/optimizer_utils.py +++ b/src/optimizer/optimizer_utils.py @@ -12,12 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List + from src.catalog.models.df_metadata import DataFrameMetadata from src.expression.function_expression import FunctionExpression from src.parser.table_ref import TableInfo from src.catalog.catalog_manager import CatalogManager -from src.catalog.column_type import ColumnType -from typing import List +from src.catalog.column_type import ColumnType, NdArrayType from src.expression.abstract_expression import AbstractExpression from src.expression.tuple_value_expression import ExpressionType, \ @@ -25,7 +26,6 @@ from src.parser.create_statement import ColumnDefinition, \ ColConstraintInfo -from src.parser.types import ParserColumnDataType from src.utils.generic_utils import path_to_class, generate_file_path from src.utils.logging_manager import LoggingLevel @@ -151,10 +151,11 @@ def create_column_metadata(col_list: List[ColumnDefinition]): "Empty column while creating column metadata", LoggingLevel.ERROR) result_list.append(col) - col_type = xform_parser_column_type_to_catalog_type(col.type) result_list.append( CatalogManager().create_column_metadata( - col.name, col_type, col.dimension)) + col.name, col.type, col.array_type, col.dimension + ) + ) return result_list @@ -177,9 +178,8 @@ def column_definition_to_udf_io( "Empty column definition while creating udf io", LoggingLevel.ERROR) result_list.append(col) - col_type = xform_parser_column_type_to_catalog_type(col.type) result_list.append( - CatalogManager().udf_io(col.name, col_type, + CatalogManager().udf_io(col.name, col.type, col.dimension, is_input) ) return result_list @@ -198,39 +198,18 @@ def create_video_metadata(name: str) -> DataFrameMetadata: DataFrameMetadata: corresponding metadata for the input table info """ catalog = CatalogManager() - columns = [ColumnDefinition('id', ParserColumnDataType.INTEGER, [], - ColConstraintInfo(unique=True))] + columns = [ColumnDefinition('id', ColumnType.INTEGER, None, + [], ColConstraintInfo(unique=True))] # the ndarray dimensions are set as None. We need to fix this as we # cannot assume. Either ask the user to provide this with load or # we infer this from the provided video. columns.append( ColumnDefinition( - 'data', ParserColumnDataType.NDARRAY, [ - None, None, None])) + 'data', ColumnType.NDARRAY, NdArrayType.UINT8, [None, None, None] + ) + ) col_metadata = create_column_metadata(columns) uri = str(generate_file_path(name)) metadata = catalog.create_metadata( name, uri, col_metadata, identifier_column='id') return metadata - - -def xform_parser_column_type_to_catalog_type( - col_type: ParserColumnDataType) -> ColumnType: - """translate parser defined column type to the catalog type - - Arguments: - col_type {ParserColumnDataType} -- input parser column type - - Returns: - ColumnType -- catalog column type - """ - if col_type == ParserColumnDataType.BOOLEAN: - return ColumnType.BOOLEAN - elif col_type == ParserColumnDataType.FLOAT: - return ColumnType.FLOAT - elif col_type == ParserColumnDataType.INTEGER: - return ColumnType.INTEGER - elif col_type == ParserColumnDataType.TEXT: - return ColumnType.TEXT - elif col_type == ParserColumnDataType.NDARRAY: - return ColumnType.NDARRAY diff --git a/src/parser/create_statement.py b/src/parser/create_statement.py index bca4dee85d..236be09b2e 100644 --- a/src/parser/create_statement.py +++ b/src/parser/create_statement.py @@ -12,14 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List from src.parser.statement import AbstractStatement from src.parser.types import StatementType from src.parser.table_ref import TableRef -from typing import List -from src.parser.types import ParserColumnDataType - +from src.catalog.column_type import ColumnType, NdArrayType class ColConstraintInfo: def __init__(self, nullable=False, default_value=None, @@ -39,11 +38,12 @@ def __eq__(self, other): class ColumnDefinition: - def __init__(self, col_name: str, - col_type: ParserColumnDataType, col_dim: List[int], + def __init__(self, col_name: str, col_type: ColumnType, + col_array_type: NdArrayType, col_dim: List[int], cci: ColConstraintInfo = ColConstraintInfo()): self._name = col_name self._type = col_type + self._array_type = col_array_type self._dimension = col_dim self._cci = cci @@ -55,6 +55,10 @@ def name(self): def type(self): return self._type + @property + def array_type(self): + return self._array_type + @property def dimension(self): return self._dimension @@ -64,7 +68,8 @@ def cci(self): return self._cci def __str__(self): - return '{} {} {}'.format(self._name, self._type, self._dimension) + return '{} {} {}'.format(self._name, self._type, self.array_type, + self._dimension) def __eq__(self, other): if not isinstance(other, ColumnDefinition): @@ -72,6 +77,7 @@ def __eq__(self, other): return (self.name == other.name and self.type == other.type + and self.array_type == other.array_type and self.dimension == other.dimension and self.cci == other.cci) diff --git a/src/parser/evaql/evaql_lexer.g4 b/src/parser/evaql/evaql_lexer.g4 index bb1660222e..be1ce04bf5 100644 --- a/src/parser/evaql/evaql_lexer.g4 +++ b/src/parser/evaql/evaql_lexer.g4 @@ -100,6 +100,18 @@ INTEGER: 'INTEGER'; FLOAT: 'FLOAT'; TEXT: 'TEXT'; NDARRAY: 'NDARRAY'; +INT8: 'INT8'; +UINT8: 'UINT8'; +INT16: 'INT16'; +INT32: 'INT32'; +INT64: 'INT64'; +UNICODE: 'UNICODE'; +BOOL: 'BOOL'; +FLOAT32: 'FLOAT32'; +FLOAT64: 'FLOAT64'; +DECIMAL: 'DECIMAL'; +STR: 'STR'; +DATETIME: 'DATETIME'; // Group function Keywords diff --git a/src/parser/evaql/evaql_parser.g4 b/src/parser/evaql/evaql_parser.g4 index 5b9741d579..dcc202d0ac 100644 --- a/src/parser/evaql/evaql_parser.g4 +++ b/src/parser/evaql/evaql_parser.g4 @@ -377,12 +377,19 @@ constant // Data Types +arrayType + : INT8 | UINT8 | INT16 | INT32 | INT64 + | UNICODE | BOOL + | FLOAT32 | FLOAT64 | DECIMAL + | STR | DATETIME + ; + dataType : BOOLEAN #simpleDataType | TEXT lengthOneDimension? #dimensionDataType | INTEGER UNSIGNED? #integerDataType | FLOAT lengthTwoDimension? UNSIGNED? #dimensionDataType - | NDARRAY lengthDimensionList #dimensionDataType + | NDARRAY arrayType lengthDimensionList #arrayDataType ; lengthOneDimension diff --git a/src/parser/parser_visitor/_create_statements.py b/src/parser/parser_visitor/_create_statements.py index 5e90d65f3b..db79756cde 100644 --- a/src/parser/parser_visitor/_create_statements.py +++ b/src/parser/parser_visitor/_create_statements.py @@ -16,10 +16,11 @@ from src.parser.evaql.evaql_parserVisitor import evaql_parserVisitor from src.parser.create_statement import CreateTableStatement, ColumnDefinition from src.parser.evaql.evaql_parser import evaql_parser -from src.parser.types import ParserColumnDataType from src.parser.types import ColumnConstraintEnum from src.parser.create_statement import ColConstraintInfo +from src.catalog.column_type import ColumnType, NdArrayType + ################################################################## # CREATE STATEMENTS @@ -71,19 +72,19 @@ def visitCreateDefinitions( def visitColumnDeclaration( self, ctx: evaql_parser.ColumnDeclarationContext): - data_type, dimensions, column_constraint_information = self.visit( - ctx.columnDefinition()) + data_type, array_type, dimensions, column_constraint_information = \ + self.visit(ctx.columnDefinition()) column_name = self.visit(ctx.uid()) if column_name is not None: return ColumnDefinition( - column_name, data_type, dimensions, + column_name, data_type, array_type, dimensions, column_constraint_information) def visitColumnDefinition(self, ctx: evaql_parser.ColumnDefinitionContext): - data_type, dimensions = self.visit(ctx.dataType()) + data_type, array_type, dimensions = self.visit(ctx.dataType()) constraint_count = len(ctx.columnConstraint()) @@ -95,7 +96,7 @@ def visitColumnDefinition(self, ctx: evaql_parser.ColumnDefinitionContext): column_constraint_information.unique = True - return data_type, dimensions, column_constraint_information + return data_type, array_type, dimensions, column_constraint_information def visitUniqueKeyColumnConstraint( self, ctx: evaql_parser.UniqueKeyColumnConstraintContext): @@ -104,41 +105,78 @@ def visitUniqueKeyColumnConstraint( def visitSimpleDataType(self, ctx: evaql_parser.SimpleDataTypeContext): data_type = None + array_type = None dimensions = [] if ctx.BOOLEAN() is not None: - data_type = ParserColumnDataType.BOOLEAN + data_type = ColumnType.BOOLEAN - return data_type, dimensions + return data_type, array_type, dimensions def visitIntegerDataType(self, ctx: evaql_parser.IntegerDataTypeContext): data_type = None + array_type = None dimensions = [] if ctx.INTEGER() is not None: - data_type = ParserColumnDataType.INTEGER + data_type = ColumnType.INTEGER elif ctx.UNSIGNED() is not None: - data_type = ParserColumnDataType.INTEGER + data_type = ColumnType.INTEGER - return data_type, dimensions + return data_type, array_type, dimensions def visitDimensionDataType( self, ctx: evaql_parser.DimensionDataTypeContext): data_type = None + array_type = None dimensions = [] if ctx.FLOAT() is not None: - data_type = ParserColumnDataType.FLOAT + data_type = ColumnType.FLOAT dimensions = self.visit(ctx.lengthTwoDimension()) elif ctx.TEXT() is not None: - data_type = ParserColumnDataType.TEXT + data_type = ColumnType.TEXT dimensions = self.visit(ctx.lengthOneDimension()) - elif ctx.NDARRAY() is not None: - data_type = ParserColumnDataType.NDARRAY - dimensions = self.visit(ctx.lengthDimensionList()) - return data_type, dimensions + return data_type, array_type, dimensions + + def visitArrayDataType(self, ctx: evaql_parser.ArrayDataTypeContext): + data_type = ColumnType.NDARRAY + array_type = self.visit(ctx.arrayType()) + dimensions = self.visit(ctx.lengthDimensionList()) + return data_type, array_type, dimensions + + def visitArrayType(self, ctx: evaql_parser.ArrayTypeContext): + array_type = None + + if ctx.INT8() is not None: + array_type = NdArrayType.INT8 + elif ctx.UINT8() is not None: + array_type = NdArrayType.UINT8 + elif ctx.INT16() is not None: + array_type = NdArrayType.INT16 + elif ctx.INT32() is not None: + array_type = NdArrayType.INT32 + elif ctx.INT64() is not None: + array_type = NdArrayType.INT64 + elif ctx.UNICODE() is not None: + array_type = NdArrayType.UNICODE + elif ctx.BOOL() is not None: + array_type = NdArrayType.BOOL + elif ctx.FLOAT32() is not None: + array_type = NdArrayType.FLOAT32 + elif ctx.FLOAT64() is not None: + array_type = NdArrayType.FLOAT64 + elif ctx.DECIMAL() is not None: + array_type = NdArrayType.DECIMAL + elif ctx.STR() is not None: + array_type = NdArrayType.STR + elif ctx.DATETIME() is not None: + array_type = NdArrayType.DATETIME + + return array_type + def visitLengthOneDimension( self, ctx: evaql_parser.LengthOneDimensionContext): diff --git a/src/parser/types.py b/src/parser/types.py index a03ddcc471..3b6cff832a 100644 --- a/src/parser/types.py +++ b/src/parser/types.py @@ -35,18 +35,6 @@ class StatementType(IntEnum): # add other types -@unique -class ParserColumnDataType(IntEnum): - """ - Manages enums for all column data types - """ - BOOLEAN = 1 - INTEGER = 2 - FLOAT = 3 - TEXT = 4 - NDARRAY = 5 - - @unique class ParserOrderBySortType(IntEnum): """ From f0c26431fc043609d7763bc4f809584c4d42123d Mon Sep 17 00:00:00 2001 From: xzdandy Date: Mon, 1 Mar 2021 00:54:54 -0500 Subject: [PATCH 10/16] generic array support and type casting --- eva.yml | 2 +- src/expression/constant_value_expression.py | 6 ++-- src/parser/parser_visitor/_expressions.py | 8 ++--- src/storage/petastorm_storage_engine.py | 33 ++++++++++++++++-- test/integration_tests/test_pytorch.py | 4 +-- test/integration_tests/test_udf_executor.py | 37 ++++++++++++++++----- test/parser/test_parser_visitor.py | 17 ++++++++-- 7 files changed, 82 insertions(+), 25 deletions(-) diff --git a/eva.yml b/eva.yml index 13fd495025..1dc397d227 100644 --- a/eva.yml +++ b/eva.yml @@ -1,6 +1,6 @@ core: location: "eva_datasets" - sqlalchemy_database_uri: 'mysql+pymysql://root:root@localhost/eva_catalog' + sqlalchemy_database_uri: 'mysql+pymysql://root@localhost/eva_catalog' application: "eva" executor: diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index bc8c828c3a..5306a6b640 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -12,10 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pandas as pd +import numpy as np + from src.expression.abstract_expression import AbstractExpression, \ ExpressionType from src.models.storage.batch import Batch -import pandas as pd class ConstantValueExpression(AbstractExpression): @@ -27,7 +29,7 @@ def __init__(self, value): super().__init__(ExpressionType.CONSTANT_VALUE) self._value = value - def evaluate(self, *args): + def evaluate(self, *args, **kwargs): return Batch(pd.DataFrame({0: [self._value]})) @property diff --git a/src/parser/parser_visitor/_expressions.py b/src/parser/parser_visitor/_expressions.py index 88ff9d7513..21d772ae00 100644 --- a/src/parser/parser_visitor/_expressions.py +++ b/src/parser/parser_visitor/_expressions.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import json +import ast import numpy as np from src.parser.evaql.evaql_parserVisitor import evaql_parserVisitor @@ -40,9 +39,7 @@ def visitStringLiteral(self, ctx: evaql_parser.StringLiteralContext): return self.visitChildren(ctx) def visitArrayLiteral(self, ctx: evaql_parser.ArrayLiteralContext): - # change the dtype when we add support for np.float - res = ConstantValueExpression(np.array(json.loads(ctx.getText()), - dtype=np.uint8)) + res = ConstantValueExpression(np.array(ast.literal_eval(ctx.getText()))) return res def visitConstant(self, ctx: evaql_parser.ConstantContext): @@ -51,7 +48,6 @@ def visitConstant(self, ctx: evaql_parser.ConstantContext): if ctx.decimalLiteral() is not None: return ConstantValueExpression(self.visit(ctx.decimalLiteral())) - print(ctx.getText()) return self.visitChildren(ctx) def visitLogicalExpression( diff --git a/src/storage/petastorm_storage_engine.py b/src/storage/petastorm_storage_engine.py index e79a7b2224..fc59acef24 100644 --- a/src/storage/petastorm_storage_engine.py +++ b/src/storage/petastorm_storage_engine.py @@ -12,6 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np +import pandas as pd + +from typing import Iterator, List +from pathlib import Path from src.spark.session import Session from src.catalog.models.df_metadata import DataFrameMetadata @@ -20,10 +25,13 @@ from src.readers.petastorm_reader import PetastormReader from src.models.storage.batch import Batch +from petastorm.unischema import Unischema from petastorm.unischema import dict_to_spark_row +from petastorm.codecs import NdarrayCodec from petastorm.predicates import in_lambda -from typing import Iterator, List -from pathlib import Path + +from src.utils.logging_manager import LoggingLevel +from src.utils.logging_manager import LoggingManager class PetastormStorageEngine(AbstractStorageEngine): @@ -59,6 +67,24 @@ def create(self, table: DataFrameMetadata): .mode('overwrite') \ .parquet(self._spark_url(table)) + def _write_type_cast(self, schema: Unischema, df: pd.DataFrame) \ + -> np.ndarray: + """ + Try to cast the type if schema defined in UnischemeField for + Petastorm is not consistent with panda DataFrame provided. + """ + for unischema in schema.fields.values(): + if not isinstance(unischema.codec, NdarrayCodec): + continue + # We only care when the cell data is np.ndarray + col = unischema.name + dtype = unischema.numpy_dtype + try: + df[col] = df[col].apply(lambda x: x.astype(dtype)) + except Exception as e: + LoggingManager().exception('Failed to cast type for Petastorm') + return df + def write(self, table: DataFrameMetadata, rows: Batch): """ Write rows into the dataframe. @@ -73,11 +99,12 @@ def write(self, table: DataFrameMetadata, rows: Batch): # ToDo # Throw an error if the row schema doesn't match the table schema + records = self._write_type_cast(table.schema.petastorm_schema, + rows.frames) with materialize_dataset(self.spark_session, self._spark_url(table), table.schema.petastorm_schema): - records = rows.frames columns = records.keys() rows_rdd = self.spark_context.parallelize(records.values) \ .map(lambda x: dict(zip(columns, x))) \ diff --git a/test/integration_tests/test_pytorch.py b/test/integration_tests/test_pytorch.py index d6c7717443..91e77e1eda 100644 --- a/test/integration_tests/test_pytorch.py +++ b/test/integration_tests/test_pytorch.py @@ -29,7 +29,7 @@ def test_should_run_pytorch_and_fastrcnn(self): perform_query(query) create_udf_query = """CREATE UDF FastRCNNObjectDetector - INPUT (Frame_Array NDARRAY (3, 256, 256)) + INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) OUTPUT (label TEXT(10)) TYPE Classification IMPL 'src/udfs/fastrcnn_object_detector.py'; @@ -47,7 +47,7 @@ def test_should_run_pytorch_and_ssd(self): perform_query(query) create_udf_query = """CREATE UDF SSDObjectDetector - INPUT (Frame_Array NDARRAY (3, 256, 256)) + INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) OUTPUT (label TEXT(10)) TYPE Classification IMPL 'src/udfs/ssd_object_detector.py'; diff --git a/test/integration_tests/test_udf_executor.py b/test/integration_tests/test_udf_executor.py index 3e3fd7cbb6..41c3830e92 100644 --- a/test/integration_tests/test_udf_executor.py +++ b/test/integration_tests/test_udf_executor.py @@ -48,9 +48,9 @@ def test_should_load_and_select_using_udf_video_in_table(self): """ perform_query(create_udf_query) - select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo;" + select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ + ORDER BY id;" actual_batch = perform_query(select_query) - actual_batch.sort() labels = DummyObjectDetector().labels expected = [{'id': i, 'label': [labels[1 + i % 2]]} for i in range(NUM_FRAMES)] @@ -62,25 +62,44 @@ def test_should_load_and_select_using_udf_video(self): perform_query(load_query) create_udf_query = """CREATE UDF DummyObjectDetector - INPUT (Frame_Array NDARRAY (3, 256, 256)) + INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) OUTPUT (label TEXT(10)) TYPE Classification IMPL 'test/util.py'; """ perform_query(create_udf_query) + # Equality test select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ - WHERE DummyObjectDetector(data).label = ['person'];" + WHERE DummyObjectDetector(data).label = ['person'] ORDER BY id;" actual_batch = perform_query(select_query) - actual_batch.sort() - expected = [{'id': i * 2, 'label': 'person'} + expected = [{'id': i * 2, 'label': ['person']} + for i in range(NUM_FRAMES // 2)] + expected_batch = Batch(frames=pd.DataFrame(expected)) + self.assertEqual(actual_batch, expected_batch) + + # Contain test + select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ + WHERE DummyObjectDetector(data).label @> ['person'] ORDER BY id;" + actual_batch = perform_query(select_query) + self.assertEqual(actual_batch, expected_batch) + + + # Mutli element contain test + + select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ + WHERE DummyObjectDetector(data).label <@ ['person', 'bicycle'] \ + ORDER BY id;" + actual_batch = perform_query(select_query) + expected = [{'id': i * 2, 'label': ['person']} for i in range(NUM_FRAMES // 2)] + expected += [{'id': i, 'label': ['bicycle']} + for i in range(NUM_FRAMES) + if i % 2 + 1 == 2] expected_batch = Batch(frames=pd.DataFrame(expected)) - print(expected_batch) - print(actual_batch) + expected_batch.sort() self.assertEqual(actual_batch, expected_batch) - return nested_select_query = """SELECT id, data FROM (SELECT id, data, DummyObjectDetector(data) FROM MyVideo WHERE id >= 2 diff --git a/test/parser/test_parser_visitor.py b/test/parser/test_parser_visitor.py index 08c0bf092d..f2d38c4052 100644 --- a/test/parser/test_parser_visitor.py +++ b/test/parser/test_parser_visitor.py @@ -204,8 +204,8 @@ def test_visit_constant(self): expected.evaluate(), Batch(pd.DataFrame([float(ctx.getText())]))) - def test_visit_array_literal(self): - ''' Testing when array literal + def test_visit_int_array_literal(self): + ''' Testing int array literal Function: visitArrayLiteral ''' ctx = MagicMock() @@ -217,6 +217,19 @@ def test_visit_array_literal(self): Batch(pd.DataFrame({0: [np.array([1,2,3,4])]})) ) + def test_visit_str_array_literal(self): + ''' Testing str array literal + Function: visitArrayLiteral + ''' + ctx = MagicMock() + visitor = ParserVisitor() + ctx.getText.return_value = "['person', 'car']" + expected = visitor.visitArrayLiteral(ctx) + self.assertEqual( + expected.evaluate(), + Batch(pd.DataFrame({0: [np.array(['person', 'car'])]})) + ) + def test_visit_query_specification_base_exception(self): ''' Testing Base Exception error handling Function: visitQuerySpecification From 6f345475e3c482b949cb1c676b2baa7fe61bee03 Mon Sep 17 00:00:00 2001 From: xzdandy Date: Mon, 1 Mar 2021 16:05:45 -0500 Subject: [PATCH 11/16] fix all testcases --- src/catalog/column_type.py | 4 +- src/catalog/schema_utils.py | 23 ++++++++++ src/executor/insert_executor.py | 4 ++ src/expression/comparison_expression.py | 8 ++-- src/expression/constant_value_expression.py | 1 - src/models/storage/batch.py | 20 ++++---- src/parser/create_statement.py | 5 +- .../parser_visitor/_create_statements.py | 1 - src/parser/parser_visitor/_expressions.py | 4 +- src/storage/petastorm_storage_engine.py | 29 +----------- test/catalog/models/test_models.py | 3 +- test/catalog/test_schema.py | 9 ++-- test/expression/test_comparison.py | 12 ++--- test/integration_tests/test_pytorch.py | 6 ++- test/integration_tests/test_udf_executor.py | 8 ++-- test/optimizer/test_optimizer_utils.py | 12 ++--- test/parser/test_parser.py | 22 ++++----- test/parser/test_parser_visitor.py | 2 +- test/parser/test_utils.py | 46 ------------------- test/storage/test_petastorm_storage_engine.py | 4 +- 20 files changed, 91 insertions(+), 132 deletions(-) delete mode 100644 test/parser/test_utils.py diff --git a/src/catalog/column_type.py b/src/catalog/column_type.py index 96322dd1f7..54ad96c818 100644 --- a/src/catalog/column_type.py +++ b/src/catalog/column_type.py @@ -14,6 +14,7 @@ # limitations under the License. from enum import Enum, auto + class ColumnType(Enum): BOOLEAN = 1 INTEGER = 2 @@ -21,6 +22,7 @@ class ColumnType(Enum): TEXT = 4 NDARRAY = 5 + class NdArrayType(Enum): INT8 = auto() UINT8 = auto() @@ -42,7 +44,7 @@ def to_numpy_type(cls, t): if t == cls.INT8: np_type = np.int8 - elif t is NdArrayType.UINT8: + elif t == cls.UINT8: np_type = np.uint8 elif t == cls.INT16: np_type = np.int16 diff --git a/src/catalog/schema_utils.py b/src/catalog/schema_utils.py index 8d6a467065..d24670deec 100644 --- a/src/catalog/schema_utils.py +++ b/src/catalog/schema_utils.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import numpy as np +import pandas as pd + from petastorm.codecs import NdarrayCodec from petastorm.codecs import ScalarCodec from petastorm.unischema import Unischema @@ -80,3 +82,24 @@ def get_petastorm_schema(name, column_list): petastorm_schema = Unischema(name, petastorm_column_list) return petastorm_schema + + @staticmethod + def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) \ + -> pd.DataFrame: + """ + Try to cast the type if schema defined in UnischemeField for + Petastorm is not consistent with panda DataFrame provided. + """ + for unischema in schema.fields.values(): + if not isinstance(unischema.codec, NdarrayCodec): + continue + # We only care when the cell data is np.ndarray + col = unischema.name + dtype = unischema.numpy_dtype + try: + df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False)) + except Exception: + LoggingManager().exception( + 'Failed to cast %s to %s for Petastorm' % (col, dtype) + ) + return df diff --git a/src/executor/insert_executor.py b/src/executor/insert_executor.py index 5503e08bef..53fcce153a 100644 --- a/src/executor/insert_executor.py +++ b/src/executor/insert_executor.py @@ -18,6 +18,7 @@ from src.executor.abstract_executor import AbstractExecutor from src.storage.storage_engine import StorageEngine from src.models.storage.batch import Batch +from src.catalog.schema_utils import SchemaUtils class InsertExecutor(AbstractExecutor): @@ -44,4 +45,7 @@ def exec(self): batch = Batch.merge_column_wise(data_tuple) metadata = CatalogManager().get_metadata(table_id) # verify value types are consistent + + batch.frames = SchemaUtils.petastorm_type_cast( + metadata.schema.petastorm_schema, batch.frames) StorageEngine.write(metadata, batch) diff --git a/src/expression/comparison_expression.py b/src/expression/comparison_expression.py index 0ea449a770..1314b0adc2 100644 --- a/src/expression/comparison_expression.py +++ b/src/expression/comparison_expression.py @@ -59,13 +59,13 @@ def evaluate(self, *args): elif self.etype == ExpressionType.COMPARE_NEQ: return Batch(pd.DataFrame(lvalues != rvalues)) elif self.etype == ExpressionType.COMPARE_CONTAINS: - res = [[all(x in p for x in q) \ - for p, q in zip(left, right)] \ + res = [[all(x in p for x in q) + for p, q in zip(left, right)] for left, right in zip(lvalues, rvalues)] return Batch(pd.DataFrame(res)) elif self.etype == ExpressionType.COMPARE_IS_CONTAINED: - res = [[all(x in q for x in p) \ - for p, q in zip(left, right)] \ + res = [[all(x in q for x in p) + for p, q in zip(left, right)] for left, right in zip(lvalues, rvalues)] return Batch(pd.DataFrame(res)) else: diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index 5306a6b640..c94b926c9f 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd -import numpy as np from src.expression.abstract_expression import AbstractExpression, \ ExpressionType diff --git a/src/models/storage/batch.py b/src/models/storage/batch.py index 66929c13d9..4d0b35fefd 100644 --- a/src/models/storage/batch.py +++ b/src/models/storage/batch.py @@ -51,20 +51,24 @@ def __init__(self, identifier_column='id'): super().__init__() # store the batch with columns sorted - if isinstance(frames, DataFrame): - self._frames = frames[sorted(frames.columns)] - else: - LoggingManager().log('Batch constructor not properly called!', - LoggingLevel.DEBUG) - raise ValueError('Batch constructor not properly called. \ - Expected pandas.DataFrame') - self._batch_size = len(frames) + self.frames = frames self._identifier_column = identifier_column @property def frames(self): return self._frames + @frames.setter + def frames(self, values): + if isinstance(values, DataFrame): + self._frames = values[sorted(values.columns)] + else: + LoggingManager().log('Batch constructor not properly called!', + LoggingLevel.DEBUG) + raise ValueError('Batch constructor not properly called. \ + Expected pandas.DataFrame') + self._batch_size = len(values) + @property def batch_size(self): return self._batch_size diff --git a/src/parser/create_statement.py b/src/parser/create_statement.py index 236be09b2e..994738e657 100644 --- a/src/parser/create_statement.py +++ b/src/parser/create_statement.py @@ -20,6 +20,7 @@ from src.parser.table_ref import TableRef from src.catalog.column_type import ColumnType, NdArrayType + class ColConstraintInfo: def __init__(self, nullable=False, default_value=None, primary=False, unique=False): @@ -68,8 +69,8 @@ def cci(self): return self._cci def __str__(self): - return '{} {} {}'.format(self._name, self._type, self.array_type, - self._dimension) + return '{} {} {} {}'.format(self._name, self._type, self.array_type, + self._dimension) def __eq__(self, other): if not isinstance(other, ColumnDefinition): diff --git a/src/parser/parser_visitor/_create_statements.py b/src/parser/parser_visitor/_create_statements.py index db79756cde..5fe111e77d 100644 --- a/src/parser/parser_visitor/_create_statements.py +++ b/src/parser/parser_visitor/_create_statements.py @@ -177,7 +177,6 @@ def visitArrayType(self, ctx: evaql_parser.ArrayTypeContext): return array_type - def visitLengthOneDimension( self, ctx: evaql_parser.LengthOneDimensionContext): dimensions = [] diff --git a/src/parser/parser_visitor/_expressions.py b/src/parser/parser_visitor/_expressions.py index 21d772ae00..40b8d50290 100644 --- a/src/parser/parser_visitor/_expressions.py +++ b/src/parser/parser_visitor/_expressions.py @@ -39,7 +39,9 @@ def visitStringLiteral(self, ctx: evaql_parser.StringLiteralContext): return self.visitChildren(ctx) def visitArrayLiteral(self, ctx: evaql_parser.ArrayLiteralContext): - res = ConstantValueExpression(np.array(ast.literal_eval(ctx.getText()))) + res = ConstantValueExpression( + np.array(ast.literal_eval(ctx.getText())) + ) return res def visitConstant(self, ctx: evaql_parser.ConstantContext): diff --git a/src/storage/petastorm_storage_engine.py b/src/storage/petastorm_storage_engine.py index fc59acef24..b1fb271bae 100644 --- a/src/storage/petastorm_storage_engine.py +++ b/src/storage/petastorm_storage_engine.py @@ -12,9 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import numpy as np -import pandas as pd - from typing import Iterator, List from pathlib import Path @@ -25,14 +22,9 @@ from src.readers.petastorm_reader import PetastormReader from src.models.storage.batch import Batch -from petastorm.unischema import Unischema from petastorm.unischema import dict_to_spark_row -from petastorm.codecs import NdarrayCodec from petastorm.predicates import in_lambda -from src.utils.logging_manager import LoggingLevel -from src.utils.logging_manager import LoggingManager - class PetastormStorageEngine(AbstractStorageEngine): @@ -67,24 +59,6 @@ def create(self, table: DataFrameMetadata): .mode('overwrite') \ .parquet(self._spark_url(table)) - def _write_type_cast(self, schema: Unischema, df: pd.DataFrame) \ - -> np.ndarray: - """ - Try to cast the type if schema defined in UnischemeField for - Petastorm is not consistent with panda DataFrame provided. - """ - for unischema in schema.fields.values(): - if not isinstance(unischema.codec, NdarrayCodec): - continue - # We only care when the cell data is np.ndarray - col = unischema.name - dtype = unischema.numpy_dtype - try: - df[col] = df[col].apply(lambda x: x.astype(dtype)) - except Exception as e: - LoggingManager().exception('Failed to cast type for Petastorm') - return df - def write(self, table: DataFrameMetadata, rows: Batch): """ Write rows into the dataframe. @@ -99,12 +73,11 @@ def write(self, table: DataFrameMetadata, rows: Batch): # ToDo # Throw an error if the row schema doesn't match the table schema - records = self._write_type_cast(table.schema.petastorm_schema, - rows.frames) with materialize_dataset(self.spark_session, self._spark_url(table), table.schema.petastorm_schema): + records = rows.frames columns = records.keys() rows_rdd = self.spark_context.parallelize(records.values) \ .map(lambda x: dict(zip(columns, x))) \ diff --git a/test/catalog/models/test_models.py b/test/catalog/models/test_models.py index 84a3e58626..ba5c84cc2e 100644 --- a/test/catalog/models/test_models.py +++ b/test/catalog/models/test_models.py @@ -35,7 +35,8 @@ def test_df_column(self): self.assertEqual(df_col.type, ColumnType.TEXT) self.assertEqual(df_col.metadata_id, 1) self.assertEqual(df_col.id, None) - self.assertEqual(str(df_col), 'Column: (name, TEXT, False, [1, 2])') + self.assertEqual(str(df_col), + 'Column: (name, TEXT, False, None[1, 2])') def test_df_equality(self): df_col = DataFrameColumn('name', ColumnType.TEXT, is_nullable=False) diff --git a/test/catalog/test_schema.py b/test/catalog/test_schema.py index 9bc51815ed..31e90c3873 100644 --- a/test/catalog/test_schema.py +++ b/test/catalog/test_schema.py @@ -20,7 +20,7 @@ from pyspark.sql.types import IntegerType, FloatType, StringType from unittest.mock import MagicMock, call, patch -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.df_schema import DataFrameSchema from src.catalog.models.df_column import DataFrameColumn from src.catalog.schema_utils import SchemaUtils @@ -50,7 +50,8 @@ def test_get_petastorm_column(self): StringType()), False) self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - col = DataFrameColumn(col_name, ColumnType.NDARRAY, True, [10, 10]) + col = DataFrameColumn(col_name, ColumnType.NDARRAY, True, + NdArrayType.UINT8, [10, 10]) petastorm_col = UnischemaField( col_name, np.uint8, [ 10, 10], NdarrayCodec(), True) @@ -77,7 +78,7 @@ def test_df_schema(self): schema_name = "foo" column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) column_2 = DataFrameColumn("frame_data", ColumnType.NDARRAY, False, - [28, 28]) + NdArrayType.UINT8, [28, 28]) column_3 = DataFrameColumn("frame_label", ColumnType.INTEGER, False) col_list = [column_1, column_2, column_3] schema = DataFrameSchema(schema_name, col_list) @@ -99,7 +100,7 @@ def test_schema_equality(self): schema_name = "foo" column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) column_2 = DataFrameColumn("frame_data", ColumnType.NDARRAY, False, - [28, 28]) + NdArrayType.UINT8, [28, 28]) column_3 = DataFrameColumn("frame_label", ColumnType.INTEGER, False) col_list = [column_1, column_2, column_3] schema1 = DataFrameSchema(schema_name, col_list) diff --git a/test/expression/test_comparison.py b/test/expression/test_comparison.py index 101ebfa174..310da958b7 100644 --- a/test/expression/test_comparison.py +++ b/test/expression/test_comparison.py @@ -113,9 +113,9 @@ def test_comparison_compare_neq(self): self.assertEqual([True], cmpr_exp.evaluate(None).frames[0].tolist()) def test_comparison_compare_contains(self): - const_exp1 = ConstantValueExpression([1,2]) - const_exp2 = ConstantValueExpression([1,5]) - const_exp3 = ConstantValueExpression([1,2,3,4]) + const_exp1 = ConstantValueExpression([1, 2]) + const_exp2 = ConstantValueExpression([1, 5]) + const_exp3 = ConstantValueExpression([1, 2, 3, 4]) cmpr_exp1 = ComparisonExpression( ExpressionType.COMPARE_CONTAINS, @@ -134,9 +134,9 @@ def test_comparison_compare_contains(self): self.assertEqual([False], cmpr_exp2.evaluate(None).frames[0].tolist()) def test_comparison_compare_is_contained(self): - const_exp1 = ConstantValueExpression([1,2]) - const_exp2 = ConstantValueExpression([1,5]) - const_exp3 = ConstantValueExpression([1,2,3,4]) + const_exp1 = ConstantValueExpression([1, 2]) + const_exp2 = ConstantValueExpression([1, 5]) + const_exp3 = ConstantValueExpression([1, 2, 3, 4]) cmpr_exp1 = ComparisonExpression( ExpressionType.COMPARE_IS_CONTAINED, diff --git a/test/integration_tests/test_pytorch.py b/test/integration_tests/test_pytorch.py index 91e77e1eda..4ba67d6c60 100644 --- a/test/integration_tests/test_pytorch.py +++ b/test/integration_tests/test_pytorch.py @@ -23,6 +23,7 @@ class PytorchTest(unittest.TestCase): def setUp(self): CatalogManager().reset() + @unittest.skip('Too slow when batch size is small.') def test_should_run_pytorch_and_fastrcnn(self): query = """LOAD DATA INFILE 'data/ua_detrac/ua_detrac.mp4' INTO MyVideo;""" @@ -30,7 +31,7 @@ def test_should_run_pytorch_and_fastrcnn(self): create_udf_query = """CREATE UDF FastRCNNObjectDetector INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) - OUTPUT (label TEXT(10)) + OUTPUT (label NDARRAY STR(10)) TYPE Classification IMPL 'src/udfs/fastrcnn_object_detector.py'; """ @@ -41,6 +42,7 @@ def test_should_run_pytorch_and_fastrcnn(self): actual_batch = perform_query(select_query) self.assertEqual(actual_batch.batch_size, 5) + @unittest.skip('Too slow when batch size is small.') def test_should_run_pytorch_and_ssd(self): query = """LOAD DATA INFILE 'data/ua_detrac/ua_detrac.mp4' INTO MyVideo;""" @@ -48,7 +50,7 @@ def test_should_run_pytorch_and_ssd(self): create_udf_query = """CREATE UDF SSDObjectDetector INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) - OUTPUT (label TEXT(10)) + OUTPUT (label NDARRAY STR(10)) TYPE Classification IMPL 'src/udfs/ssd_object_detector.py'; """ diff --git a/test/integration_tests/test_udf_executor.py b/test/integration_tests/test_udf_executor.py index 41c3830e92..7f6636e262 100644 --- a/test/integration_tests/test_udf_executor.py +++ b/test/integration_tests/test_udf_executor.py @@ -35,14 +35,13 @@ def tearDown(self): # integration test - @unittest.skip('Too slow when batch size is small.') def test_should_load_and_select_using_udf_video_in_table(self): load_query = """LOAD DATA INFILE 'dummy.avi' INTO MyVideo;""" perform_query(load_query) create_udf_query = """CREATE UDF DummyObjectDetector - INPUT (Frame_Array NDARRAY (3, 256, 256)) - OUTPUT (label TEXT(10)) + INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) + OUTPUT (label NDARRAY STR(10)) TYPE Classification IMPL 'test/util.py'; """ @@ -63,7 +62,7 @@ def test_should_load_and_select_using_udf_video(self): create_udf_query = """CREATE UDF DummyObjectDetector INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) - OUTPUT (label TEXT(10)) + OUTPUT (label NDARRAY STR(10)) TYPE Classification IMPL 'test/util.py'; """ @@ -84,7 +83,6 @@ def test_should_load_and_select_using_udf_video(self): actual_batch = perform_query(select_query) self.assertEqual(actual_batch, expected_batch) - # Mutli element contain test select_query = "SELECT id,DummyObjectDetector(data) FROM MyVideo \ diff --git a/test/optimizer/test_optimizer_utils.py b/test/optimizer/test_optimizer_utils.py index 21d3d6de0b..e7ee2e2041 100644 --- a/test/optimizer/test_optimizer_utils.py +++ b/test/optimizer/test_optimizer_utils.py @@ -25,7 +25,7 @@ bind_columns_expr, create_video_metadata) from src.parser.create_statement import ColumnDefinition -from src.parser.types import ParserColumnDataType +from src.catalog.column_type import ColumnType, NdArrayType class OptimizerUtilsTest(unittest.TestCase): @@ -61,19 +61,15 @@ def test_bind_function_value_expr(self, mock_str_path, mock_catalog): mock_str_path.return_value.return_value) @patch('src.optimizer.optimizer_utils.CatalogManager') - @patch('src.optimizer.optimizer_utils.\ -xform_parser_column_type_to_catalog_type') - def test_column_definition_to_udf_io(self, mock_func, mock): + def test_column_definition_to_udf_io(self, mock): mock.return_value.udf_io.return_value = 'udf_io' col = MagicMock(spec=ColumnDefinition) col.name = 'name' col.type = 'type' col.dimension = 'dimension' col_list = [col, col] - mock_func.return_value = col.type actual = column_definition_to_udf_io(col_list, True) for col in col_list: - mock_func.assert_called_with('type') mock.return_value.udf_io.assert_called_with( 'name', 'type', 'dimension', True) @@ -116,9 +112,9 @@ def test_create_video_metadata(self, m_gfp, m_ccm, m_cci, m_cd, m_cm): m_cm.return_value = catalog_ins catalog_ins.create_metadata.return_value = expected - calls = [call('id', ParserColumnDataType.INTEGER, [], + calls = [call('id', ColumnType.INTEGER, None, [], 'cci'), - call('data', ParserColumnDataType.NDARRAY, + call('data', ColumnType.NDARRAY, NdArrayType.UINT8, [None, None, None])] actual = create_video_metadata(name) diff --git a/test/parser/test_parser.py b/test/parser/test_parser.py index 8b4968b004..68545c45b5 100644 --- a/test/parser/test_parser.py +++ b/test/parser/test_parser.py @@ -21,7 +21,6 @@ from src.parser.statement import StatementType from src.parser.select_statement import SelectStatement -from src.parser.types import ParserColumnDataType from src.parser.create_statement import ColumnDefinition from src.parser.create_udf_statement import CreateUDFStatement from src.parser.load_statement import LoadDataStatement @@ -33,6 +32,7 @@ from src.parser.table_ref import TableRef, TableInfo from src.parser.types import ParserOrderBySortType +from src.catalog.column_type import ColumnType, NdArrayType from pathlib import Path @@ -50,7 +50,7 @@ def test_create_statement(self): Frame_ID INTEGER UNIQUE, Frame_Data TEXT(10), Frame_Value FLOAT(1000, 201), - Frame_Array NDARRAY (5, 100, 2432, 4324, 100) + Frame_Array NDARRAY UINT8(5, 100, 2432, 4324, 100) );""") for query in single_queries: @@ -298,8 +298,8 @@ def test_insert_statement(self): def test_create_udf_statement(self): parser = Parser() create_udf_query = """CREATE UDF FastRCNN - INPUT (Frame_Array NDARRAY (3, 256, 256)) - OUTPUT (Labels NDARRAY (10), Bbox NDARRAY (10, 4)) + INPUT (Frame_Array NDARRAY UINT8(3, 256, 256)) + OUTPUT (Labels NDARRAY STR(10), Bbox NDARRAY UINT8(10, 4)) TYPE Classification IMPL 'data/fastrcnn.py'; """ @@ -307,12 +307,12 @@ def test_create_udf_statement(self): expected_stmt = CreateUDFStatement( 'FastRCNN', False, [ ColumnDefinition( - 'Frame_Array', ParserColumnDataType.NDARRAY, [ - 3, 256, 256])], [ + 'Frame_Array', ColumnType.NDARRAY, NdArrayType.UINT8, + [3, 256, 256])], [ ColumnDefinition( - 'Labels', ParserColumnDataType.NDARRAY, [10]), + 'Labels', ColumnType.NDARRAY, NdArrayType.STR, [10]), ColumnDefinition( - 'Bbox', ParserColumnDataType.NDARRAY, [10, 4])], + 'Bbox', ColumnType.NDARRAY, NdArrayType.UINT8, [10, 4])], Path('data/fastrcnn.py'), 'Classification') eva_statement_list = parser.parse(create_udf_query) self.assertIsInstance(eva_statement_list, list) @@ -377,10 +377,10 @@ def test_should_return_false_for_unequal_expression(self): create_udf = CreateUDFStatement( 'udf', False, [ ColumnDefinition( - 'frame', ParserColumnDataType.NDARRAY, [ - 3, 256, 256])], [ + 'frame', ColumnType.NDARRAY, NdArrayType.UINT8, + [3, 256, 256])], [ ColumnDefinition( - 'labels', ParserColumnDataType.NDARRAY, [10])], + 'labels', ColumnType.NDARRAY, NdArrayType.STR, [10])], Path('data/fastrcnn.py'), 'Classification') select_stmt = SelectStatement() self.assertNotEqual(load_stmt, insert_stmt) diff --git a/test/parser/test_parser_visitor.py b/test/parser/test_parser_visitor.py index f2d38c4052..16606776da 100644 --- a/test/parser/test_parser_visitor.py +++ b/test/parser/test_parser_visitor.py @@ -214,7 +214,7 @@ def test_visit_int_array_literal(self): expected = visitor.visitArrayLiteral(ctx) self.assertEqual( expected.evaluate(), - Batch(pd.DataFrame({0: [np.array([1,2,3,4])]})) + Batch(pd.DataFrame({0: [np.array([1, 2, 3, 4])]})) ) def test_visit_str_array_literal(self): diff --git a/test/parser/test_utils.py b/test/parser/test_utils.py deleted file mode 100644 index 2b2f0d9a48..0000000000 --- a/test/parser/test_utils.py +++ /dev/null @@ -1,46 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2020 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from src.parser.types import ParserColumnDataType -from src.catalog.column_type import ColumnType -from src.parser.utils import xform_parser_column_type_to_catalog_type - - -class ParserTests(unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def test_xform_parser_column_type_to_catalog_type(self): - col_type = ParserColumnDataType.BOOLEAN - self.assertEqual( - xform_parser_column_type_to_catalog_type(col_type), - ColumnType.BOOLEAN) - col_type = ParserColumnDataType.FLOAT - self.assertEqual( - xform_parser_column_type_to_catalog_type(col_type), - ColumnType.FLOAT) - col_type = ParserColumnDataType.INTEGER - self.assertEqual( - xform_parser_column_type_to_catalog_type(col_type), - ColumnType.INTEGER) - col_type = ParserColumnDataType.TEXT - self.assertEqual( - xform_parser_column_type_to_catalog_type(col_type), - ColumnType.TEXT) - col_type = ParserColumnDataType.NDARRAY - self.assertEqual( - xform_parser_column_type_to_catalog_type(col_type), - ColumnType.NDARRAY) diff --git a/test/storage/test_petastorm_storage_engine.py b/test/storage/test_petastorm_storage_engine.py index 9e98025448..1f3ea1d109 100644 --- a/test/storage/test_petastorm_storage_engine.py +++ b/test/storage/test_petastorm_storage_engine.py @@ -18,7 +18,7 @@ from src.catalog.models.df_metadata import DataFrameMetadata from src.storage.petastorm_storage_engine import PetastormStorageEngine from src.catalog.models.df_column import DataFrameColumn -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from test.util import create_dummy_batches from test.util import NUM_FRAMES @@ -34,7 +34,7 @@ def create_sample_table(self): table_info = DataFrameMetadata("dataset", 'dataset') column_1 = DataFrameColumn("id", ColumnType.INTEGER, False) column_2 = DataFrameColumn( - "data", ColumnType.NDARRAY, False, [ + "data", ColumnType.NDARRAY, False, NdArrayType.UINT8, [ 2, 2, 3]) table_info.schema = [column_1, column_2] return table_info From 32acacb51175b28653dfa41a5896ddc1db80fb7d Mon Sep 17 00:00:00 2001 From: xzdandy Date: Mon, 1 Mar 2021 16:35:11 -0500 Subject: [PATCH 12/16] ndarray type test --- test/catalog/test_column_type.py | 34 ++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 test/catalog/test_column_type.py diff --git a/test/catalog/test_column_type.py b/test/catalog/test_column_type.py new file mode 100644 index 0000000000..4841c9af6d --- /dev/null +++ b/test/catalog/test_column_type.py @@ -0,0 +1,34 @@ +# coding=utf-8 +# Copyright 2018-2020 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import numpy as np + +from decimal import Decimal +from src.catalog.column_type import ColumnType, NdArrayType + + +class ColumnTypeTests(unittest.TestCase): + + def test_ndarray_type_to_numpy_type(self): + expected_type = [np.int8, np.uint8, np.int16, np.int32, np.int64, + np.unicode_, np.bool_, np.float32, np.float64, + Decimal, np.str_, np.datetime64] + + for ndarray_type, np_type in zip(NdArrayType, expected_type): + self.assertEqual(NdArrayType.to_numpy_type(ndarray_type), np_type) + + def test_raise_exception_uknown_ndarray_type(self): + self.assertRaises(ValueError, NdArrayType.to_numpy_type, + ColumnType.TEXT) From 1a0a13673b28ef1351c259f4272ffcf1a913b72f Mon Sep 17 00:00:00 2001 From: xzdandy Date: Mon, 1 Mar 2021 17:08:33 -0500 Subject: [PATCH 13/16] add testcases --- test/catalog/test_schema.py | 27 +++++++++++++++++++------- test/integration_tests/test_pytorch.py | 2 -- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/test/catalog/test_schema.py b/test/catalog/test_schema.py index 31e90c3873..b1e7187576 100644 --- a/test/catalog/test_schema.py +++ b/test/catalog/test_schema.py @@ -19,6 +19,7 @@ from petastorm.unischema import UnischemaField from pyspark.sql.types import IntegerType, FloatType, StringType +from decimal import Decimal from unittest.mock import MagicMock, call, patch from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.df_schema import DataFrameSchema @@ -50,16 +51,28 @@ def test_get_petastorm_column(self): StringType()), False) self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - col = DataFrameColumn(col_name, ColumnType.NDARRAY, True, - NdArrayType.UINT8, [10, 10]) - petastorm_col = UnischemaField( - col_name, np.uint8, [ - 10, 10], NdarrayCodec(), True) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - col = DataFrameColumn(col_name, None, True, [10, 10]) self.assertEqual(SchemaUtils.get_petastorm_column(col), None) + def test_get_petastorm_column_ndarray(self): + expected_type = [np.int8, np.uint8, np.int16, np.int32, np.int64, + np.unicode_, np.bool_, np.float32, np.float64, + Decimal, np.str_, np.datetime64] + col_name = 'frame_id' + for array_type, np_type in zip(NdArrayType, expected_type): + col = DataFrameColumn(col_name, ColumnType.NDARRAY, True, + array_type, [10, 10]) + petastorm_col = UnischemaField(col_name, np_type, [10, 10], + NdarrayCodec(), True) + self.assertEqual(SchemaUtils.get_petastorm_column(col), + petastorm_col) + + def test_raise_exception_when_unkown_array_type(self): + col_name = 'frame_id' + col = DataFrameColumn(col_name, ColumnType.NDARRAY, True, + ColumnType.TEXT, [10, 10]) + self.assertRaises(ValueError, SchemaUtils.get_petastorm_column, col) + @patch('src.catalog.schema_utils.Unischema') @patch('src.catalog.schema_utils.SchemaUtils.get_petastorm_column') def test_get_petastorm_schema(self, mock_get_pc, mock_uni): diff --git a/test/integration_tests/test_pytorch.py b/test/integration_tests/test_pytorch.py index 4ba67d6c60..af8b96b305 100644 --- a/test/integration_tests/test_pytorch.py +++ b/test/integration_tests/test_pytorch.py @@ -23,7 +23,6 @@ class PytorchTest(unittest.TestCase): def setUp(self): CatalogManager().reset() - @unittest.skip('Too slow when batch size is small.') def test_should_run_pytorch_and_fastrcnn(self): query = """LOAD DATA INFILE 'data/ua_detrac/ua_detrac.mp4' INTO MyVideo;""" @@ -42,7 +41,6 @@ def test_should_run_pytorch_and_fastrcnn(self): actual_batch = perform_query(select_query) self.assertEqual(actual_batch.batch_size, 5) - @unittest.skip('Too slow when batch size is small.') def test_should_run_pytorch_and_ssd(self): query = """LOAD DATA INFILE 'data/ua_detrac/ua_detrac.mp4' INTO MyVideo;""" From 2c584c87d3738502d4455b95eeeda908d1da416a Mon Sep 17 00:00:00 2001 From: xzdandy Date: Fri, 5 Mar 2021 11:24:44 -0500 Subject: [PATCH 14/16] fix merge errors --- src/expression/comparison_expression.py | 4 ++-- src/expression/constant_value_expression.py | 1 - src/expression/logical_expression.py | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/expression/comparison_expression.py b/src/expression/comparison_expression.py index 82ee184c0c..914d6751a2 100644 --- a/src/expression/comparison_expression.py +++ b/src/expression/comparison_expression.py @@ -34,8 +34,8 @@ def __init__(self, exp_type: ExpressionType, left: AbstractExpression, def evaluate(self, *args, **kwargs): # cast in to numpy array - lvalues = self.get_child(0).evaluate(*args).frames.values - rvalues = self.get_child(1).evaluate(*args).frames.values + lvalues = self.get_child(0).evaluate(*args, **kwargs).frames.values + rvalues = self.get_child(1).evaluate(*args, **kwargs).frames.values if len(lvalues) != len(rvalues): if len(lvalues) == 1: diff --git a/src/expression/constant_value_expression.py b/src/expression/constant_value_expression.py index a1a7aa0521..c94b926c9f 100644 --- a/src/expression/constant_value_expression.py +++ b/src/expression/constant_value_expression.py @@ -31,7 +31,6 @@ def __init__(self, value): def evaluate(self, *args, **kwargs): return Batch(pd.DataFrame({0: [self._value]})) - @property def value(self): return self._value diff --git a/src/expression/logical_expression.py b/src/expression/logical_expression.py index e25835bd0c..61740f2b9b 100644 --- a/src/expression/logical_expression.py +++ b/src/expression/logical_expression.py @@ -41,8 +41,7 @@ def evaluate(self, *args, **kwargs): if left_values.all().bool(): # check if all are true return Batch(left_values) kwargs["mask"] = left_values[~left_values[0]].index.tolist() - right_values = self.get_child( - 1).evaluate(*args, **kwargs).frames + right_values = self.get_child(1).evaluate(*args, **kwargs).frames left_values.iloc[kwargs["mask"]] = right_values return Batch(pd.DataFrame(left_values)) else: From 0853244eed2598981566436106e8bf00f58fd9bf Mon Sep 17 00:00:00 2001 From: xzdandy Date: Thu, 11 Mar 2021 16:11:56 -0500 Subject: [PATCH 15/16] array type support for udf_io --- src/catalog/catalog_manager.py | 17 ------------- src/catalog/models/df_column.py | 4 +-- src/catalog/models/udf_io.py | 20 ++++++++++----- src/optimizer/optimizer_utils.py | 7 ++++-- test/catalog/models/test_models.py | 25 +++++++++++-------- test/catalog/test_catalog_manager.py | 11 --------- test/optimizer/test_optimizer_utils.py | 34 ++++++++++++++------------ 7 files changed, 55 insertions(+), 63 deletions(-) diff --git a/src/catalog/catalog_manager.py b/src/catalog/catalog_manager.py index a26bc5cba5..bcb2577840 100644 --- a/src/catalog/catalog_manager.py +++ b/src/catalog/catalog_manager.py @@ -234,23 +234,6 @@ def get_dataset_metadata(self, database_name: str, dataset_name: str) -> \ metadata.schema = df_columns return metadata - def udf_io( - self, io_name: str, data_type: ColumnType, - dimensions: List[int], is_input: bool): - """Constructs an in memory udf_io object with given info. - This function won't commit this object in the catalog database. - If you want to commit it into catalog call create_udf with - corresponding udf_id and io list - - Arguments: - name(str): io name to be created - data_type(ColumnType): type of io created - dimensions(List[int]):dimensions of the io created - is_input(bool): whether a input or output, if true it is an input - """ - return UdfIO(io_name, data_type, - array_dimensions=dimensions, is_input=is_input) - def create_udf(self, name: str, impl_file_path: str, type: str, udf_io_list: List[UdfIO]) -> UdfMetadata: """Creates an udf metadata object and udf_io objects and persists them diff --git a/src/catalog/models/df_column.py b/src/catalog/models/df_column.py index e1134fa42b..58a967fb6f 100644 --- a/src/catalog/models/df_column.py +++ b/src/catalog/models/df_column.py @@ -52,7 +52,7 @@ def __init__(self, self._type = type self._is_nullable = is_nullable self._array_type = array_type - self._array_dimensions = str(array_dimensions) + self.array_dimensions = array_dimensions self._metadata_id = metadata_id @property @@ -80,7 +80,7 @@ def array_dimensions(self): return literal_eval(self._array_dimensions) @array_dimensions.setter - def array_dimensions(self, value): + def array_dimensions(self, value: List[int]): self._array_dimensions = str(value) @property diff --git a/src/catalog/models/udf_io.py b/src/catalog/models/udf_io.py index 997c3b8461..94457eb6dd 100644 --- a/src/catalog/models/udf_io.py +++ b/src/catalog/models/udf_io.py @@ -12,15 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json from typing import List from sqlalchemy import Column, String, Integer, Boolean, UniqueConstraint, \ ForeignKey from sqlalchemy.orm import relationship from sqlalchemy.types import Enum +from ast import literal_eval -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.models.base_model import BaseModel @@ -30,6 +30,7 @@ class UdfIO(BaseModel): _name = Column('name', String(100)) _type = Column('type', Enum(ColumnType), default=Enum) _is_nullable = Column('is_nullable', Boolean, default=False) + _array_type = Column('array_type', Enum(NdArrayType), nullable=True) _array_dimensions = Column('array_dimensions', String(100)) _is_input = Column('is_input', Boolean, default=True) _udf_id = Column('udf_id', Integer, @@ -44,13 +45,15 @@ def __init__(self, name: str, type: ColumnType, is_nullable: bool = False, + array_type: NdArrayType = None, array_dimensions: List[int] = [], is_input: bool = True, udf_id: int = None): self._name = name self._type = type self._is_nullable = is_nullable - self._array_dimensions = str(array_dimensions) + self._array_type = array_type + self.array_dimensions = array_dimensions self._is_input = is_input self._udf_id = udf_id @@ -70,12 +73,16 @@ def type(self): def is_nullable(self): return self._is_nullable + @property + def array_type(self): + return self._array_type + @property def array_dimensions(self): - return json.loads(self._array_dimensions) + return literal_eval(self._array_dimensions) @array_dimensions.setter - def array_dimensions(self, value): + def array_dimensions(self, value: List[int]): self._array_dimensions = str(value) @property @@ -96,7 +103,7 @@ def __str__(self): self._is_nullable, self._is_input) - column_str += "[" + column_str += "%s[" % self.array_type column_str += ', '.join(['%d'] * len(self.array_dimensions)) \ % tuple(self.array_dimensions) column_str += "] " @@ -108,6 +115,7 @@ def __eq__(self, other): return self.id == other.id and \ self.is_input == other.is_input and \ self.is_nullable == other.is_nullable and \ + self.array_type == other.array_type and \ self.array_dimensions == other.array_dimensions and \ self.name == other.name and \ self.udf_id == other.udf_id and \ diff --git a/src/optimizer/optimizer_utils.py b/src/optimizer/optimizer_utils.py index fdbb9c126c..60c2ccb625 100644 --- a/src/optimizer/optimizer_utils.py +++ b/src/optimizer/optimizer_utils.py @@ -19,6 +19,7 @@ from src.parser.table_ref import TableInfo from src.catalog.catalog_manager import CatalogManager from src.catalog.column_type import ColumnType, NdArrayType +from src.catalog.models.udf_io import UdfIO from src.expression.abstract_expression import AbstractExpression from src.expression.tuple_value_expression import ExpressionType, \ @@ -179,8 +180,10 @@ def column_definition_to_udf_io( LoggingLevel.ERROR) result_list.append(col) result_list.append( - CatalogManager().udf_io(col.name, col.type, - col.dimension, is_input) + UdfIO(col.name, col.type, + array_type=col.array_type, + array_dimensions=col.dimension, + is_input=is_input) ) return result_list diff --git a/test/catalog/models/test_models.py b/test/catalog/models/test_models.py index ba5c84cc2e..55bc44d4c6 100644 --- a/test/catalog/models/test_models.py +++ b/test/catalog/models/test_models.py @@ -15,7 +15,7 @@ import unittest -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.df_schema import DataFrameSchema from src.catalog.models.df_column import DataFrameColumn from src.catalog.models.df_metadata import DataFrameMetadata @@ -29,6 +29,7 @@ def test_df_column(self): df_col = DataFrameColumn('name', ColumnType.TEXT, is_nullable=False) df_col.array_dimensions = [1, 2] df_col.metadata_id = 1 + self.assertEqual(df_col.array_type, None) self.assertEqual(df_col.array_dimensions, [1, 2]) self.assertEqual(df_col.is_nullable, False) self.assertEqual(df_col.name, 'name') @@ -122,27 +123,31 @@ def test_udf_equality(self): self.assertNotEqual(udf, udf4) def test_udf_io(self): - udf_io = UdfIO('name', ColumnType.FLOAT, True, [2, 3], True, 1) + udf_io = UdfIO('name', ColumnType.NDARRAY, True, NdArrayType.UINT8, + [2, 3], True, 1) self.assertEqual(udf_io.id, None) self.assertEqual(udf_io.udf_id, 1) self.assertEqual(udf_io.is_input, True) self.assertEqual(udf_io.is_nullable, True) + self.assertEqual(udf_io.array_type, NdArrayType.UINT8) self.assertEqual(udf_io.array_dimensions, [2, 3]) self.assertEqual(udf_io.name, 'name') - self.assertEqual(udf_io.type, ColumnType.FLOAT) + self.assertEqual(udf_io.type, ColumnType.NDARRAY) def test_udf_io_equality(self): - udf_io = UdfIO('name', ColumnType.FLOAT, True, [2, 3], True, 1) + udf_io = UdfIO('name', ColumnType.FLOAT, True, None, [2, 3], True, 1) self.assertEqual(udf_io, udf_io) - udf_io2 = UdfIO('name2', ColumnType.FLOAT, True, [2, 3], True, 1) + udf_io2 = UdfIO('name2', ColumnType.FLOAT, True, None, [2, 3], True, 1) self.assertNotEqual(udf_io, udf_io2) - udf_io2 = UdfIO('name', ColumnType.INTEGER, True, [2, 3], True, 1) + udf_io2 = UdfIO('name', ColumnType.INTEGER, True, None, [2, 3], True, + 1) self.assertNotEqual(udf_io, udf_io2) - udf_io2 = UdfIO('name', ColumnType.FLOAT, False, [2, 3], True, 1) + udf_io2 = UdfIO('name', ColumnType.FLOAT, False, None, [2, 3], True, 1) self.assertNotEqual(udf_io, udf_io2) - udf_io2 = UdfIO('name', ColumnType.FLOAT, True, [2, 3, 4], True, 1) + udf_io2 = UdfIO('name', ColumnType.FLOAT, True, None, [2, 3, 4], True, + 1) self.assertNotEqual(udf_io, udf_io2) - udf_io2 = UdfIO('name', ColumnType.FLOAT, True, [2, 3], False, 1) + udf_io2 = UdfIO('name', ColumnType.FLOAT, True, None, [2, 3], False, 1) self.assertNotEqual(udf_io, udf_io2) - udf_io2 = UdfIO('name', ColumnType.FLOAT, True, [2, 3], True, 2) + udf_io2 = UdfIO('name', ColumnType.FLOAT, True, None, [2, 3], True, 2) self.assertNotEqual(udf_io, udf_io2) diff --git a/test/catalog/test_catalog_manager.py b/test/catalog/test_catalog_manager.py index cc0dcf79f6..e3720b40b1 100644 --- a/test/catalog/test_catalog_manager.py +++ b/test/catalog/test_catalog_manager.py @@ -175,17 +175,6 @@ def test_get_dataset_metadata_when_table_doesnot_exists(self, dcs_mock.return_value.columns_by_id_and_dataset_id.assert_not_called() self.assertEqual(actual, metadata_obj) - @mock.patch('src.catalog.catalog_manager.UdfIO') - def test_create_udf_io_object(self, udfio_mock): - catalog = CatalogManager() - actual = catalog.udf_io('name', ColumnType.TEXT, [100], True) - udfio_mock.assert_called_with( - 'name', - ColumnType.TEXT, - array_dimensions=[100], - is_input=True) - self.assertEqual(actual, udfio_mock.return_value) - @mock.patch('src.catalog.catalog_manager.UdfService') @mock.patch('src.catalog.catalog_manager.UdfIOService') def test_create_udf(self, udfio_mock, udf_mock): diff --git a/test/optimizer/test_optimizer_utils.py b/test/optimizer/test_optimizer_utils.py index e7ee2e2041..5ce04f224d 100644 --- a/test/optimizer/test_optimizer_utils.py +++ b/test/optimizer/test_optimizer_utils.py @@ -60,26 +60,30 @@ def test_bind_function_value_expr(self, mock_str_path, mock_catalog): self.assertEqual(func_expr.function, mock_str_path.return_value.return_value) - @patch('src.optimizer.optimizer_utils.CatalogManager') - def test_column_definition_to_udf_io(self, mock): - mock.return_value.udf_io.return_value = 'udf_io' - col = MagicMock(spec=ColumnDefinition) - col.name = 'name' - col.type = 'type' - col.dimension = 'dimension' + def test_column_definition_to_udf_io(self): + col = ColumnDefinition('data', ColumnType.NDARRAY, NdArrayType.UINT8, + [None, None, None]) col_list = [col, col] actual = column_definition_to_udf_io(col_list, True) - for col in col_list: - mock.return_value.udf_io.assert_called_with( - 'name', 'type', 'dimension', True) - - self.assertEqual(actual, ['udf_io', 'udf_io']) + for io in actual: + self.assertEqual(io.name, 'data') + self.assertEqual(io.type, ColumnType.NDARRAY) + self.assertEqual(io.is_nullable, False) + self.assertEqual(io.array_type, NdArrayType.UINT8) + self.assertEqual(io.array_dimensions, [None, None, None]) + self.assertEqual(io.is_input, True) + self.assertEqual(io.udf_id, None) # input not list actual2 = column_definition_to_udf_io(col, True) - mock.return_value.udf_io.assert_called_with( - 'name', 'type', 'dimension', True) - self.assertEqual(actual2, ['udf_io']) + for io in actual2: + self.assertEqual(io.name, 'data') + self.assertEqual(io.type, ColumnType.NDARRAY) + self.assertEqual(io.is_nullable, False) + self.assertEqual(io.array_type, NdArrayType.UINT8) + self.assertEqual(io.array_dimensions, [None, None, None]) + self.assertEqual(io.is_input, True) + self.assertEqual(io.udf_id, None) @patch('src.optimizer.optimizer_utils.bind_function_expr') def test_bind_predicate_calls_bind_func_expr_if_type_functional(self, From 50812db907d47fee28caf9ef152974127553aa9b Mon Sep 17 00:00:00 2001 From: xzdandy Date: Thu, 11 Mar 2021 20:13:44 -0500 Subject: [PATCH 16/16] revert catalog udf_io --- src/catalog/catalog_manager.py | 18 ++++++++++++++++++ src/optimizer/optimizer_utils.py | 9 ++++----- test/catalog/test_catalog_manager.py | 15 ++++++++++++++- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/catalog/catalog_manager.py b/src/catalog/catalog_manager.py index bcb2577840..40e3eb5927 100644 --- a/src/catalog/catalog_manager.py +++ b/src/catalog/catalog_manager.py @@ -234,6 +234,24 @@ def get_dataset_metadata(self, database_name: str, dataset_name: str) -> \ metadata.schema = df_columns return metadata + def udf_io( + self, io_name: str, data_type: ColumnType, array_type: NdArrayType, + dimensions: List[int], is_input: bool): + """Constructs an in memory udf_io object with given info. + This function won't commit this object in the catalog database. + If you want to commit it into catalog call create_udf with + corresponding udf_id and io list + + Arguments: + name(str): io name to be created + data_type(ColumnType): type of io created + array_type(NdArrayType): type of array content + dimensions(List[int]):dimensions of the io created + is_input(bool): whether a input or output, if true it is an input + """ + return UdfIO(io_name, data_type, array_type=array_type, + array_dimensions=dimensions, is_input=is_input) + def create_udf(self, name: str, impl_file_path: str, type: str, udf_io_list: List[UdfIO]) -> UdfMetadata: """Creates an udf metadata object and udf_io objects and persists them diff --git a/src/optimizer/optimizer_utils.py b/src/optimizer/optimizer_utils.py index 60c2ccb625..0f71d77bfa 100644 --- a/src/optimizer/optimizer_utils.py +++ b/src/optimizer/optimizer_utils.py @@ -19,7 +19,6 @@ from src.parser.table_ref import TableInfo from src.catalog.catalog_manager import CatalogManager from src.catalog.column_type import ColumnType, NdArrayType -from src.catalog.models.udf_io import UdfIO from src.expression.abstract_expression import AbstractExpression from src.expression.tuple_value_expression import ExpressionType, \ @@ -180,10 +179,10 @@ def column_definition_to_udf_io( LoggingLevel.ERROR) result_list.append(col) result_list.append( - UdfIO(col.name, col.type, - array_type=col.array_type, - array_dimensions=col.dimension, - is_input=is_input) + CatalogManager().udf_io(col.name, col.type, + array_type=col.array_type, + dimensions=col.dimension, + is_input=is_input) ) return result_list diff --git a/test/catalog/test_catalog_manager.py b/test/catalog/test_catalog_manager.py index e3720b40b1..f62a6f2a77 100644 --- a/test/catalog/test_catalog_manager.py +++ b/test/catalog/test_catalog_manager.py @@ -17,7 +17,7 @@ import mock from mock import MagicMock from src.catalog.catalog_manager import CatalogManager -from src.catalog.column_type import ColumnType +from src.catalog.column_type import ColumnType, NdArrayType from src.catalog.models.df_column import DataFrameColumn @@ -175,6 +175,19 @@ def test_get_dataset_metadata_when_table_doesnot_exists(self, dcs_mock.return_value.columns_by_id_and_dataset_id.assert_not_called() self.assertEqual(actual, metadata_obj) + @mock.patch('src.catalog.catalog_manager.UdfIO') + def test_create_udf_io_object(self, udfio_mock): + catalog = CatalogManager() + actual = catalog.udf_io('name', ColumnType.NDARRAY, NdArrayType.UINT8, + [2, 3, 4], True) + udfio_mock.assert_called_with( + 'name', + ColumnType.NDARRAY, + array_type=NdArrayType.UINT8, + array_dimensions=[2, 3, 4], + is_input=True) + self.assertEqual(actual, udfio_mock.return_value) + @mock.patch('src.catalog.catalog_manager.UdfService') @mock.patch('src.catalog.catalog_manager.UdfIOService') def test_create_udf(self, udfio_mock, udf_mock):