Skip to content

Commit

Permalink
Merge pull request #146 from georgia-tech-db/contain
Browse files Browse the repository at this point in the history
Contain operator and generic ndarray syntax support in EVA
  • Loading branch information
gaurav274 committed Mar 26, 2021
2 parents 83f7a4d + d62e7f8 commit e238bb3
Show file tree
Hide file tree
Showing 33 changed files with 633 additions and 322 deletions.
15 changes: 9 additions & 6 deletions src/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from typing import List, Tuple

from src.catalog.column_type import ColumnType
from src.catalog.column_type import ColumnType, NdArrayType
from src.catalog.models.base_model import init_db, drop_db
from src.catalog.models.df_column import DataFrameColumn
from src.catalog.models.df_metadata import DataFrameMetadata
Expand Down Expand Up @@ -195,8 +195,9 @@ def get_column_ids(self, table_metadata_id: int) -> List[int]:
return col_ids

def create_column_metadata(
self, column_name: str, data_type: ColumnType,
dimensions: List[int]):
self, column_name: str, data_type: ColumnType, array_type: NdArrayType,
dimensions: List[int]
) -> DataFrameColumn:
"""Create a dataframe column object this column.
This function won't commit this object in the catalog database.
If you want to commit it into catalog table call create_metadata with
Expand All @@ -205,9 +206,10 @@ def create_column_metadata(
Arguments:
column_name {str} -- column name to be created
data_type {ColumnType} -- type of column created
array_type {NdArrayType} -- type of ndarray
dimensions {List[int]} -- dimensions of the column created
"""
return DataFrameColumn(column_name, data_type,
return DataFrameColumn(column_name, data_type, array_type=array_type,
array_dimensions=dimensions)

def get_dataset_metadata(self, database_name: str, dataset_name: str) -> \
Expand All @@ -233,7 +235,7 @@ def get_dataset_metadata(self, database_name: str, dataset_name: str) -> \
return metadata

def udf_io(
self, io_name: str, data_type: ColumnType,
self, io_name: str, data_type: ColumnType, array_type: NdArrayType,
dimensions: List[int], is_input: bool):
"""Constructs an in memory udf_io object with given info.
This function won't commit this object in the catalog database.
Expand All @@ -243,10 +245,11 @@ def udf_io(
Arguments:
name(str): io name to be created
data_type(ColumnType): type of io created
array_type(NdArrayType): type of array content
dimensions(List[int]):dimensions of the io created
is_input(bool): whether a input or output, if true it is an input
"""
return UdfIO(io_name, data_type,
return UdfIO(io_name, data_type, array_type=array_type,
array_dimensions=dimensions, is_input=is_input)

def create_udf(self, name: str, impl_file_path: str,
Expand Down
51 changes: 50 additions & 1 deletion src/catalog/column_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from enum import Enum, auto


class ColumnType(Enum):
Expand All @@ -21,3 +21,52 @@ class ColumnType(Enum):
FLOAT = 3
TEXT = 4
NDARRAY = 5


class NdArrayType(Enum):
INT8 = auto()
UINT8 = auto()
INT16 = auto()
INT32 = auto()
INT64 = auto()
UNICODE = auto()
BOOL = auto()
FLOAT32 = auto()
FLOAT64 = auto()
DECIMAL = auto()
STR = auto()
DATETIME = auto()

@classmethod
def to_numpy_type(cls, t):
import numpy as np
from decimal import Decimal

if t == cls.INT8:
np_type = np.int8
elif t == cls.UINT8:
np_type = np.uint8
elif t == cls.INT16:
np_type = np.int16
elif t == cls.INT32:
np_type = np.int32
elif t == cls.INT64:
np_type = np.int64
elif t == cls.UNICODE:
np_type = np.unicode_
elif t == cls.BOOL:
np_type = np.bool_
elif t == cls.FLOAT32:
np_type = np.float32
elif t == cls.FLOAT64:
np_type = np.float64
elif t == cls.DECIMAL:
np_type = Decimal
elif t == cls.STR:
np_type = np.str_
elif t == cls.DATETIME:
np_type = np.datetime64
else:
raise ValueError('Can not auto convert %s to numpy type' % t)

return np_type
16 changes: 12 additions & 4 deletions src/catalog/models/df_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from sqlalchemy.types import Enum
from ast import literal_eval

from src.catalog.column_type import ColumnType
from src.catalog.column_type import ColumnType, NdArrayType
from src.catalog.models.base_model import BaseModel


Expand All @@ -30,6 +30,7 @@ class DataFrameColumn(BaseModel):
_name = Column('name', String(100))
_type = Column('type', Enum(ColumnType), default=Enum)
_is_nullable = Column('is_nullable', Boolean, default=False)
_array_type = Column('array_type', Enum(NdArrayType), nullable=True)
_array_dimensions = Column('array_dimensions', String(100))
_metadata_id = Column('metadata_id', Integer,
ForeignKey('df_metadata.id'))
Expand All @@ -44,12 +45,14 @@ def __init__(self,
name: str,
type: ColumnType,
is_nullable: bool = False,
array_type: NdArrayType = None,
array_dimensions: List[int] = [],
metadata_id: int = None):
self._name = name
self._type = type
self._is_nullable = is_nullable
self._array_dimensions = str(array_dimensions)
self._array_type = array_type
self.array_dimensions = array_dimensions
self._metadata_id = metadata_id

@property
Expand All @@ -68,12 +71,16 @@ def type(self):
def is_nullable(self):
return self._is_nullable

@property
def array_type(self):
return self._array_type

@property
def array_dimensions(self):
return literal_eval(self._array_dimensions)

@array_dimensions.setter
def array_dimensions(self, value):
def array_dimensions(self, value: List[int]):
self._array_dimensions = str(value)

@property
Expand All @@ -89,7 +96,7 @@ def __str__(self):
self._type.name,
self._is_nullable)

column_str += "["
column_str += "%s[" % self.array_type
column_str += ', '.join(['%d'] * len(self.array_dimensions)) \
% tuple(self.array_dimensions)
column_str += "])"
Expand All @@ -100,6 +107,7 @@ def __eq__(self, other):
return self.id == other.id and \
self.metadata_id == other.metadata_id and \
self.is_nullable == other.is_nullable and \
self.array_type == other.array_type and \
self.array_dimensions == other.array_dimensions and \
self.name == other.name and \
self.type == other.type
20 changes: 14 additions & 6 deletions src/catalog/models/udf_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List

from sqlalchemy import Column, String, Integer, Boolean, UniqueConstraint, \
ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.types import Enum
from ast import literal_eval

from src.catalog.column_type import ColumnType
from src.catalog.column_type import ColumnType, NdArrayType
from src.catalog.models.base_model import BaseModel


Expand All @@ -30,6 +30,7 @@ class UdfIO(BaseModel):
_name = Column('name', String(100))
_type = Column('type', Enum(ColumnType), default=Enum)
_is_nullable = Column('is_nullable', Boolean, default=False)
_array_type = Column('array_type', Enum(NdArrayType), nullable=True)
_array_dimensions = Column('array_dimensions', String(100))
_is_input = Column('is_input', Boolean, default=True)
_udf_id = Column('udf_id', Integer,
Expand All @@ -44,13 +45,15 @@ def __init__(self,
name: str,
type: ColumnType,
is_nullable: bool = False,
array_type: NdArrayType = None,
array_dimensions: List[int] = [],
is_input: bool = True,
udf_id: int = None):
self._name = name
self._type = type
self._is_nullable = is_nullable
self._array_dimensions = str(array_dimensions)
self._array_type = array_type
self.array_dimensions = array_dimensions
self._is_input = is_input
self._udf_id = udf_id

Expand All @@ -70,12 +73,16 @@ def type(self):
def is_nullable(self):
return self._is_nullable

@property
def array_type(self):
return self._array_type

@property
def array_dimensions(self):
return json.loads(self._array_dimensions)
return literal_eval(self._array_dimensions)

@array_dimensions.setter
def array_dimensions(self, value):
def array_dimensions(self, value: List[int]):
self._array_dimensions = str(value)

@property
Expand All @@ -96,7 +103,7 @@ def __str__(self):
self._is_nullable,
self._is_input)

column_str += "["
column_str += "%s[" % self.array_type
column_str += ', '.join(['%d'] * len(self.array_dimensions)) \
% tuple(self.array_dimensions)
column_str += "] "
Expand All @@ -108,6 +115,7 @@ def __eq__(self, other):
return self.id == other.id and \
self.is_input == other.is_input and \
self.is_nullable == other.is_nullable and \
self.array_type == other.array_type and \
self.array_dimensions == other.array_dimensions and \
self.name == other.name and \
self.udf_id == other.udf_id and \
Expand Down
29 changes: 27 additions & 2 deletions src/catalog/schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd

from petastorm.codecs import NdarrayCodec
from petastorm.codecs import ScalarCodec
from petastorm.unischema import Unischema
from petastorm.unischema import UnischemaField
from pyspark.sql.types import IntegerType, FloatType, StringType

from src.catalog.column_type import ColumnType
from src.catalog.column_type import ColumnType, NdArrayType
from src.utils.logging_manager import LoggingLevel
from src.utils.logging_manager import LoggingManager

Expand All @@ -32,6 +34,7 @@ def get_petastorm_column(df_column):
column_type = df_column.type
column_name = df_column.name
column_is_nullable = df_column.is_nullable
column_array_type = df_column.array_type
column_array_dimensions = df_column.array_dimensions

# Reference:
Expand All @@ -58,8 +61,9 @@ def get_petastorm_column(df_column):
ScalarCodec(StringType()),
column_is_nullable)
elif column_type == ColumnType.NDARRAY:
np_type = NdArrayType.to_numpy_type(column_array_type)
petastorm_column = UnischemaField(column_name,
np.uint8,
np_type,
column_array_dimensions,
NdarrayCodec(),
column_is_nullable)
Expand All @@ -78,3 +82,24 @@ def get_petastorm_schema(name, column_list):

petastorm_schema = Unischema(name, petastorm_column_list)
return petastorm_schema

@staticmethod
def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) \
-> pd.DataFrame:
"""
Try to cast the type if schema defined in UnischemeField for
Petastorm is not consistent with panda DataFrame provided.
"""
for unischema in schema.fields.values():
if not isinstance(unischema.codec, NdarrayCodec):
continue
# We only care when the cell data is np.ndarray
col = unischema.name
dtype = unischema.numpy_dtype
try:
df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False))
except Exception:
LoggingManager().exception(
'Failed to cast %s to %s for Petastorm' % (col, dtype)
)
return df
43 changes: 11 additions & 32 deletions src/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from numpy import ndarray

from src.catalog.catalog_manager import CatalogManager
from src.catalog.column_type import ColumnType
from src.planner.insert_plan import InsertPlan
from src.executor.abstract_executor import AbstractExecutor
from src.storage.storage_engine import StorageEngine
from src.utils.logging_manager import LoggingLevel
from src.utils.logging_manager import LoggingManager
from src.models.storage.batch import Batch
from src.catalog.schema_utils import SchemaUtils


class InsertExecutor(AbstractExecutor):
Expand All @@ -39,34 +36,16 @@ def exec(self):
Right now we assume there are no missing values
"""
table_id = self.node.video_id
col_id_to_val = {}
data_tuple = []
for col, val in zip(self.node.column_list, self.node.value_list):
col_id_to_val[col.col_metadata_id] = val.evaluate()
val = val.evaluate()
val.frames.columns = [col.col_name]
data_tuple.append(val)

batch = Batch.merge_column_wise(data_tuple)
metadata = CatalogManager().get_metadata(table_id)
# verify value types are consistent

column_list = metadata.schema.column_list

data_tuple = []
for column in column_list:
col_id, col_type = column.id, column.type
if col_id in col_id_to_val.keys():
val = col_id_to_val[col_id]
try:
if col_type == ColumnType.INTEGER:
data_tuple.append(int(val))
elif col_type == ColumnType.FLOAT:
data_tuple.append(float(val))
elif col_type == ColumnType.BOOLEAN:
data_tuple.append(bool(val))
elif col_type == ColumnType.TEXT:
data_tuple.append(str(val))
elif col_type == ColumnType.NDARRAY:
data_tuple.append(ndarray(val))
except Exception as e:
LoggingManager().log(
f'Insert Executor failed bcz of invalid value {e}',
LoggingLevel.ERROR)
return

StorageEngine.write_row(metadata, [data_tuple])
batch.frames = SchemaUtils.petastorm_type_cast(
metadata.schema.petastorm_schema, batch.frames)
StorageEngine.write(metadata, batch)

0 comments on commit e238bb3

Please sign in to comment.