Skip to content

Commit

Permalink
Merge 1845fbc into 4bdd36f
Browse files Browse the repository at this point in the history
  • Loading branch information
saiprashanth173 committed Apr 27, 2020
2 parents 4bdd36f + 1845fbc commit b4d169a
Show file tree
Hide file tree
Showing 39 changed files with 603 additions and 297 deletions.
19 changes: 17 additions & 2 deletions src/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def bootstrap_catalog(self):
init_db()

def create_metadata(self, name: str, file_url: str,
column_list: List[DataFrameColumn]) -> \
column_list: List[DataFrameColumn],
identifier_column='id') -> \
DataFrameMetadata:
"""Creates metadata object when called by create executor.
Expand All @@ -71,12 +72,14 @@ def create_metadata(self, name: str, file_url: str,
name: name of the dataset/video to which this metdata corresponds
file_url: #todo
column_list: list of columns
identifier_column (str): A unique identifier column for each row
Returns:
The persisted DataFrameMetadata object with the id field populated.
"""

metadata = self._dataset_service.create_dataset(name, file_url)
metadata = self._dataset_service.create_dataset(name, file_url,
identifier_id=identifier_column)
for column in column_list:
column.metadata_id = metadata.id
column_list = self._column_service.create_column(column_list)
Expand Down Expand Up @@ -246,3 +249,15 @@ def create_udf(self, name: str, impl_file_path: str,
udf_io.udf_id = metadata.id
self._udf_io_service.add_udf_io(udf_io_list)
return metadata

def get_udf_by_name(self, name: str) -> UdfMetadata:
"""
Get the UDF information based on name.
Arguments:
name (str): name of the UDF
Returns:
UdfMetadata object
"""
return self._udf_service.udf_by_name(name)
8 changes: 7 additions & 1 deletion src/catalog/models/df_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ class DataFrameMetadata(BaseModel):

_name = Column('name', String(100), unique=True)
_file_url = Column('file_url', String(100))
_unique_identifier_column = Column('identifier_column', String(100))

_columns = relationship('DataFrameColumn',
back_populates="_dataset")

def __init__(self, name: str, file_url: str):
def __init__(self, name: str, file_url: str, identifier_id='id'):
self._name = name
self._file_url = file_url
self._schema = None
self._unique_identifier_column = identifier_id

@property
def schema(self):
Expand All @@ -56,3 +58,7 @@ def file_url(self):
@property
def columns(self):
return self._columns

@property
def identifier_column(self):
return self._unique_identifier_column
4 changes: 2 additions & 2 deletions src/catalog/services/df_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DatasetService(BaseService):
def __init__(self):
super().__init__(DataFrameMetadata)

def create_dataset(self, name, file_url) -> DataFrameMetadata:
def create_dataset(self, name, file_url, identifier_id='id') -> DataFrameMetadata:
"""
Create a new dataset entry for given name and file URL.
Arguments:
Expand All @@ -35,7 +35,7 @@ def create_dataset(self, name, file_url) -> DataFrameMetadata:
Returns:
DataFrameMetadata object
"""
metadata = self.model(name=name, file_url=file_url)
metadata = self.model(name=name, file_url=file_url, identifier_id=identifier_id)
metadata = metadata.save()
return metadata

Expand Down
4 changes: 1 addition & 3 deletions src/executor/create_udf_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# limitations under the License.

from src.catalog.catalog_manager import CatalogManager
from src.planner.create_udf_plan import CreateUDFPlan
from src.executor.abstract_executor import AbstractExecutor
import tempfile
import os.path
from src.planner.create_udf_plan import CreateUDFPlan


class CreateUDFExecutor(AbstractExecutor):
Expand Down
4 changes: 2 additions & 2 deletions src/executor/disk_based_storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import Iterator

from src.loaders import Loader
from src.models.storage.batch import FrameBatch
from src.models.storage.batch import Batch
from src.executor.abstract_storage_executor import \
AbstractStorageExecutor
from src.planner.storage_plan import StoragePlan
Expand Down Expand Up @@ -43,6 +43,6 @@ def __init__(self, node: StoragePlan):
def validate(self):
pass

def exec(self) -> Iterator[FrameBatch]:
def exec(self) -> Iterator[Batch]:
for batch in self.storage.load():
yield batch
7 changes: 5 additions & 2 deletions src/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd

from src.executor.abstract_executor import AbstractExecutor
from src.executor.seq_scan_executor import SequentialScanExecutor
from src.models.storage.batch import Batch
from src.planner.abstract_plan import AbstractPlan
from src.planner.types import PlanNodeType
from src.executor.disk_based_storage_executor import DiskStorageExecutor
Expand Down Expand Up @@ -88,7 +91,7 @@ def execute_plan(self):
# a stitched output
execution_tree = self._build_execution_tree(self._plan)

output_batches = []
output_batches = Batch(pd.DataFrame())

# ToDo generalize this logic
_INSERT_CREATE_ = (
Expand All @@ -99,7 +102,7 @@ def execute_plan(self):
execution_tree.exec()
else:
for batch in execution_tree.exec():
output_batches.append(batch)
output_batches += batch

self._clean_execution_tree(execution_tree)
return output_batches
4 changes: 2 additions & 2 deletions src/executor/pp_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
from typing import Iterator

from src.models.storage.batch import FrameBatch
from src.models.storage.batch import Batch
from src.executor.abstract_executor import AbstractExecutor
from src.planner.pp_plan import PPScanPlan

Expand All @@ -37,7 +37,7 @@ def __init__(self, node: PPScanPlan):
def validate(self):
pass

def exec(self) -> Iterator[FrameBatch]:
def exec(self) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec():
outcomes = self.predicate.evaluate(batch)
Expand Down
4 changes: 2 additions & 2 deletions src/executor/seq_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
from typing import Iterator

from src.models.storage.batch import FrameBatch
from src.models.storage.batch import Batch
from src.executor.abstract_executor import AbstractExecutor
from src.planner.seq_scan_plan import SeqScanPlan

Expand All @@ -34,7 +34,7 @@ def __init__(self, node: SeqScanPlan):
def validate(self):
pass

def exec(self) -> Iterator[FrameBatch]:
def exec(self) -> Iterator[Batch]:

child_executor = self.children[0]
for batch in child_executor.exec():
Expand Down
8 changes: 4 additions & 4 deletions src/expression/arithmetic_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def evaluate(self, *args):
vl = self.get_child(0).evaluate(*args)
vr = self.get_child(1).evaluate(*args)

if (self.etype == ExpressionType.ARITHMETIC_ADD):
if self.etype == ExpressionType.ARITHMETIC_ADD:
return vl + vr
elif(self.etype == ExpressionType.ARITHMETIC_SUBTRACT):
elif self.etype == ExpressionType.ARITHMETIC_SUBTRACT:
return vl - vr
elif(self.etype == ExpressionType.ARITHMETIC_MULTIPLY):
elif self.etype == ExpressionType.ARITHMETIC_MULTIPLY:
return vl * vr
elif(self.etype == ExpressionType.ARITHMETIC_DIVIDE):
elif self.etype == ExpressionType.ARITHMETIC_DIVIDE:
return vl / vr
5 changes: 2 additions & 3 deletions src/expression/function_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from src.expression.abstract_expression import AbstractExpression, \
ExpressionType
from src.models.storage.batch import FrameBatch
from src.models.storage.batch import Batch


@unique
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(self, func: Callable,
self.function = func
self.is_temp = is_temp

def evaluate(self, batch: FrameBatch):
def evaluate(self, batch: Batch):
args = []
if self.get_children_count() > 0:
child = self.get_child(0)
Expand All @@ -70,5 +70,4 @@ def evaluate(self, batch: FrameBatch):

if self.mode == ExecutionMode.EXEC:
batch.set_outcomes(self.name, outcome, is_temp=self.is_temp)

return outcome
7 changes: 4 additions & 3 deletions src/expression/tuple_value_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from src.catalog.models.df_column import DataFrameColumn
from src.models.storage.batch import Batch
from .abstract_expression import AbstractExpression, ExpressionType, \
ExpressionReturnType

Expand Down Expand Up @@ -66,9 +67,9 @@ def col_object(self, value: DataFrameColumn):
self._col_object = value

# remove this once doen with tuple class
def evaluate(self, *args):
def evaluate(self, batch: Batch, *args):
if args is None:
# error Handling
pass
given_tuple = args[0]
return given_tuple[(self._col_idx)]

return batch.column_as_numpy_array(self.col_name)
36 changes: 22 additions & 14 deletions src/loaders/abstract_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from typing import Iterator
from typing import Iterator, Dict

import pandas as pd

from src.catalog.models.df_metadata import DataFrameMetadata
from src.models.storage.batch import FrameBatch
from src.models.storage.frame import Frame
from src.models.storage.batch import Batch


class AbstractVideoLoader(metaclass=ABCMeta):
Expand Down Expand Up @@ -48,35 +49,42 @@ def __init__(self, video_metadata: DataFrameMetadata, batch_size=1,
self.limit = limit
self.curr_shard = curr_shard
self.total_shards = total_shards
self.identifier_column = video_metadata.identifier_column if video_metadata.identifier_column else 'id'

def load(self) -> Iterator[FrameBatch]:
def load(self) -> Iterator[Batch]:
"""
This is a generator for loading the frames of a video.
Uses the video metadata and other class arguments
Yields:
:obj: `eva.models.FrameBatch`: An object containing a batch of frames
and frame specific metadata
:obj: `Batch`: An object containing a batch of frames
and record specific metadata
"""

frames = []
for frame in self._load_frames():
if self.skip_frames > 0 and frame.index % self.skip_frames != 0:
for record in self._load_frames():
if self.skip_frames > 0 and record.get(self.identifier_column,
0) % self.skip_frames != 0:
continue
if self.limit and frame.index >= self.limit:
return FrameBatch(frames, frame.info)
frames.append(frame)
if self.limit and record.get(self.identifier_column,
0) >= self.limit:
return Batch(pd.DataFrame(frames),
identifier_column=self.identifier_column)
frames.append(record)
if len(frames) % self.batch_size == 0:
yield FrameBatch(frames, frame.info)
yield Batch(pd.DataFrame(frames),
identifier_column=self.identifier_column)
frames = []
if frames:
return FrameBatch(frames, frames[0].info)
return Batch(pd.DataFrame(frames),
identifier_column=self.identifier_column)

@abstractmethod
def _load_frames(self) -> Iterator[Frame]:
def _load_frames(self) -> Iterator[Dict]:
"""
Loads video frames from storage and returns the Frame type.
Yields:
Frame: A frame object of the video, used for processing.
"""
pass
10 changes: 1 addition & 9 deletions src/loaders/petastorm_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from petastorm import make_reader

from src.loaders.abstract_loader import AbstractVideoLoader
from src.models.catalog.frame_info import FrameInfo
from src.models.catalog.properties import ColorSpace
from src.models.storage.frame import Frame


Expand All @@ -35,15 +33,9 @@ def __init__(self, *args, **kwargs):
self.total_shards = None

def _load_frames(self) -> Iterator[Frame]:
info = None
with make_reader(self.video_metadata.file_url,
shard_count=self.total_shards,
cur_shard=self.curr_shard) \
as reader:
for frame_ind, row in enumerate(reader):
if info is None:
(height, width, num_channels) = row.frame_data.shape
info = FrameInfo(height, width, num_channels,
ColorSpace.BGR)

yield Frame(row.frame_id, row.frame_data, info)
yield row._asdict()
9 changes: 1 addition & 8 deletions src/loaders/video_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import cv2

from src.loaders.abstract_loader import AbstractVideoLoader
from src.models.catalog.frame_info import FrameInfo
from src.models.catalog.properties import ColorSpace
from src.models.storage.frame import Frame
from src.utils.logging_manager import LoggingLevel
from src.utils.logging_manager import LoggingManager
Expand All @@ -42,12 +40,7 @@ def _load_frames(self) -> Iterator[Frame]:
_, frame = video.read()
frame_ind = video_start - 1

info = None
if frame is not None:
(height, width, num_channels) = frame.shape
info = FrameInfo(height, width, num_channels, ColorSpace.BGR)

while frame is not None:
frame_ind += 1
yield Frame(frame_ind, frame, info)
yield {'id': frame_ind, 'frame_data': frame}
_, frame = video.read()
Loading

0 comments on commit b4d169a

Please sign in to comment.