From 374a5b020be413a0ee9aca4d1bac747f18b4516a Mon Sep 17 00:00:00 2001 From: Andy Xu Date: Mon, 2 Oct 2023 02:45:15 -0400 Subject: [PATCH] GitHub Data Source Integration (#1233) - [x] GitHub Data Source Integration - [x] Batching support for native storage engine. We can not do batching in storage engine, which does not work with limit. Revert the change. - [x] Full NamedUser table support - [x] Enable circle ci local PR cache for testmondata - [x] Native storage engine `read` refactory - [x] Testcases - [x] Github data source documentation --- .circleci/config.yml | 20 ++++--- docs/_toc.yml | 3 + evadb/binder/statement_binder_context.py | 5 +- evadb/catalog/catalog_manager.py | 4 ++ evadb/storage/native_storage_engine.py | 60 +++++++++++-------- .../databases/github/requirements.txt | 1 + evadb/third_party/databases/interface.py | 5 ++ evadb/third_party/databases/types.py | 6 ++ evadb/version.py | 2 +- 9 files changed, 72 insertions(+), 34 deletions(-) create mode 100644 evadb/third_party/databases/github/requirements.txt diff --git a/.circleci/config.yml b/.circleci/config.yml index 9a7136757..544470a6a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -213,10 +213,11 @@ jobs: keys: - v1-model_cache-{{ checksum "setup.py" }} - # Always restore testmondata from staging, python3.10, ray disabled. + # First try restoring the testmondata from PR, than staging. - restore_cache: keys: - - v1-testmon_cache-staging-python3.10-rayDISABLED-{{ checksum "setup.py" }} + - v1-testmon_cache-{{ .Branch }}-python<< parameters.v >>-ray<< parameters.ray >>-{{ checksum "setup.py" }}- + - v1-testmon_cache-staging-python3.10-rayDISABLED-{{ checksum "setup.py" }}- - run: name: Install EvaDB package from GitHub repo with all dependencies @@ -271,16 +272,21 @@ jobs: # Collect the testmondata only for long intergration tests - when: condition: - and: - - equal: [ LONG INTEGRATION, << parameters.mode >> ] - - equal: [ staging, << pipeline.git.branch >> ] - - equal: [ "3.10", << parameters.v >> ] - - equal: [ DISABLED, << parameters.ray >>] + or: + - equal: [ LONG INTEGRATION CACHE, << parameters.mode >> ] + - and: + - equal: [ LONG INTEGRATION, << parameters.mode >> ] + - equal: [ staging, << pipeline.git.branch >> ] + - equal: [ "3.10", << parameters.v >> ] + - equal: [ DISABLED, << parameters.ray >>] steps: - save_cache: key: v1-testmon_cache-{{ .Branch }}-python<< parameters.v >>-ray<< parameters.ray >>-{{ checksum "setup.py" }}-{{ epoch }} paths: - .testmondata + - .testmondata-shm + - .testmondata-wal + - save_cache: key: v1-pip-wheel_cache-python<< parameters.v >>-ray<< parameters.ray >>-{{ checksum "setup.py" }} diff --git a/docs/_toc.yml b/docs/_toc.yml index f35aed05a..1390ccf7c 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -128,6 +128,7 @@ parts: >>>>>>> eva-master - file: source/reference/databases/mariadb - file: source/reference/databases/github +<<<<<<< HEAD - file: source/reference/vector_stores/index title: Vector Stores @@ -153,6 +154,8 @@ parts: - file: source/reference/databases/mysql - file: source/reference/databases/mariadb >>>>>>> eva-master +======= +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) - file: source/reference/ai/index title: AI Engines diff --git a/evadb/binder/statement_binder_context.py b/evadb/binder/statement_binder_context.py index 8d08d23fc..99edb88eb 100644 --- a/evadb/binder/statement_binder_context.py +++ b/evadb/binder/statement_binder_context.py @@ -93,7 +93,10 @@ def add_table_alias(self, alias: str, database_name: str, table_name: str): <<<<<<< HEAD ) as handler: # Assemble columns. - column_df = handler.get_columns(table_name).data + response = handler.get_columns(table_name) + if response.error is not None: + raise BinderError(response.error) + column_df = response.data table_obj = create_table_catalog_entry_for_data_source( table_name, database_name, column_df ) diff --git a/evadb/catalog/catalog_manager.py b/evadb/catalog/catalog_manager.py index 902925035..08b889bca 100644 --- a/evadb/catalog/catalog_manager.py +++ b/evadb/catalog/catalog_manager.py @@ -192,6 +192,7 @@ def check_native_table_exists(self, table_name: str, database_name: str): resp = handler.get_tables() if resp.error is not None: +<<<<<<< HEAD <<<<<<< HEAD raise Exception(resp.error) ======= @@ -201,6 +202,9 @@ def check_native_table_exists(self, table_name: str, database_name: str): raise Exception(resp.error) >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> eva-master +======= + raise Exception(resp.error) +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) # Check table existence. table_df = resp.data diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py index 507e73b7e..5043160c2 100644 --- a/evadb/storage/native_storage_engine.py +++ b/evadb/storage/native_storage_engine.py @@ -158,6 +158,7 @@ def write(self, table: TableCatalogEntry, rows: Batch): logger.exception(err_msg) raise Exception(err_msg) +<<<<<<< HEAD <<<<<<< HEAD def read( self, table: TableCatalogEntry, batch_mem_size: int = 30000000 @@ -171,44 +172,51 @@ def read( ) -> Iterator[Batch]: >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> eva-master +======= + def read( + self, table: TableCatalogEntry, batch_mem_size: int = 30000000 + ) -> Iterator[Batch]: +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) try: db_catalog_entry = self._get_database_catalog_entry(table.database_name) with get_database_handler( db_catalog_entry.engine, **db_catalog_entry.params ) as handler: <<<<<<< HEAD +<<<<<<< HEAD ======= <<<<<<< HEAD uri = handler.get_sqlalchmey_uri() +======= + handler_response = handler.select(table.name) + # we prefer the generator/iterator when available + result = [] + if handler_response.data_generator: + result = handler_response.data_generator + elif handler_response.data: + result = handler_response.data +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) - # Create a metadata object - engine = create_engine(uri) - metadata = MetaData() - - Session = sessionmaker(bind=engine) - session = Session() - # Retrieve the SQLAlchemy table object for the existing table - table_to_read = Table(table.name, metadata, autoload_with=engine) - result = session.execute(table_to_read.select()).fetchall() - data_batch = [] - - # Ensure that the order of columns in the select is same as in table.columns - # Also verify if the column names are consistent - if result: - cols = result[0]._fields - index_dict = { - element.lower(): index for index, element in enumerate(cols) - } - try: - ordered_columns = sorted( - table.columns, key=lambda x: index_dict[x.name.lower()] + if handler.is_sqlalchmey_compatible(): + # For sql data source, we can deserialize sql rows into numpy array + cols = result[0]._fields + index_dict = { + element.lower(): index for index, element in enumerate(cols) + } + try: + ordered_columns = sorted( + table.columns, key=lambda x: index_dict[x.name.lower()] + ) + except KeyError as e: + raise Exception(f"Column mismatch with error {e}") + result = ( + _deserialize_sql_row(row, ordered_columns) for row in result ) - except KeyError as e: - raise Exception(f"Column mismatch with error {e}") - for row in result: - data_batch.append(_deserialize_sql_row(row, ordered_columns)) + for data_batch in result: + yield Batch(pd.DataFrame([data_batch])) +<<<<<<< HEAD if data_batch: yield Batch(pd.DataFrame(data_batch)) @@ -246,6 +254,8 @@ def read( ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> eva-master +======= +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) except Exception as e: err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}" logger.exception(err_msg) diff --git a/evadb/third_party/databases/github/requirements.txt b/evadb/third_party/databases/github/requirements.txt new file mode 100644 index 000000000..945b116ad --- /dev/null +++ b/evadb/third_party/databases/github/requirements.txt @@ -0,0 +1 @@ +PyGithub diff --git a/evadb/third_party/databases/interface.py b/evadb/third_party/databases/interface.py index aa5ba2811..513efe03d 100644 --- a/evadb/third_party/databases/interface.py +++ b/evadb/third_party/databases/interface.py @@ -65,6 +65,7 @@ def _get_database_handler(engine: str, **kwargs): <<<<<<< HEAD elif engine == "mariadb": return mod.MariaDbHandler(engine, **kwargs) +<<<<<<< HEAD ======= >>>>>>> 8c5b63dc (release: merge staging into master (#1032)) ======= @@ -77,6 +78,10 @@ def _get_database_handler(engine: str, **kwargs): ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> eva-master +======= + elif engine == "github": + return mod.GithubHandler(engine, **kwargs) +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) else: raise NotImplementedError(f"Engine {engine} is not supported") diff --git a/evadb/third_party/databases/types.py b/evadb/third_party/databases/types.py index 264aab17c..8f00aead1 100644 --- a/evadb/third_party/databases/types.py +++ b/evadb/third_party/databases/types.py @@ -87,11 +87,14 @@ def get_sqlalchmey_uri(self) -> str: """ raise NotImplementedError() +<<<<<<< HEAD <<<<<<< HEAD ======= <<<<<<< HEAD ======= >>>>>>> eva-master +======= +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) def is_sqlalchmey_compatible(self) -> bool: """ Return whether the data source is sqlaclemy compatible @@ -106,10 +109,13 @@ def is_sqlalchmey_compatible(self) -> bool: else: return True +<<<<<<< HEAD <<<<<<< HEAD ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> eva-master +======= +>>>>>>> 495ce7d7 (GitHub Data Source Integration (#1233)) def check_connection(self) -> DBHandlerStatus: """ Checks the status of the database connection. diff --git a/evadb/version.py b/evadb/version.py index 9dfca36a8..e8efde770 100644 --- a/evadb/version.py +++ b/evadb/version.py @@ -23,4 +23,4 @@ >>>>>>> 567ab492 (Bump Version to v0.3.8+dev (#1241)) VERSION_SHORT = f"{_MAJOR}.{_MINOR}" -VERSION = f"{_MAJOR}.{_MINOR}.{_REVISION}" \ No newline at end of file +VERSION = f"{_MAJOR}.{_MINOR}.{_REVISION}"