Skip to content

Commit

Permalink
msyql integration added (#965)
Browse files Browse the repository at this point in the history
- Integrated mysql data source
  • Loading branch information
rohithmulumudy committed Aug 31, 2023
1 parent 9f1653f commit 8aa807b
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 9 deletions.
9 changes: 0 additions & 9 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,12 @@
)
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.configuration.configuration_manager import ConfigurationManager
from evadb.executor.executor_utils import ExecutorError
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.parser.create_statement import ColConstraintInfo, ColumnDefinition
from evadb.utils.generic_utils import get_str_hash, remove_directory_contents


def generate_sqlalchemy_conn_str(engine: str, params: Dict[str, str]):
if engine == "postgres":
conn_str = f"""postgresql://{params["user"]}:{params["password"]}@{params["host"]}:{params["port"]}/{params["database"]}"""
else:
raise ExecutorError(f"Native engine: {engine} is not currently supported")
return conn_str


def is_video_table(table: TableCatalogEntry):
return table.table_type == TableType.VIDEO_DATA

Expand Down
2 changes: 2 additions & 0 deletions evadb/third_party/databases/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def get_database_handler(engine: str, **kwargs):

if engine == "postgres":
return mod.PostgresHandler(engine, **kwargs)
elif engine == "mysql":
return mod.MysqlHandler(engine, **kwargs)
else:
raise NotImplementedError(f"Engine {engine} is not supported")

Expand Down
15 changes: 15 additions & 0 deletions evadb/third_party/databases/mysql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MySQL integrations"""
111 changes: 111 additions & 0 deletions evadb/third_party/databases/mysql/mysql_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mysql.connector
import pandas as pd

from evadb.third_party.databases.types import (
DBHandler,
DBHandlerResponse,
DBHandlerStatus,
)


class MysqlHandler(DBHandler):
def __init__(self, name: str, **kwargs):
super().__init__(name)
self.host = kwargs.get("host")
self.port = kwargs.get("port")
self.user = kwargs.get("user")
self.password = kwargs.get("password")
self.database = kwargs.get("database")

def connect(self):
try:
self.connection = mysql.connector.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
)
self.connection.autocommit = True
return DBHandlerStatus(status=True)
except mysql.connector.Error as e:
return DBHandlerStatus(status=False, error=str(e))

def disconnect(self):
if self.connection:
self.connection.close()

def check_connection(self) -> DBHandlerStatus:
if self.connection:
return DBHandlerStatus(status=True)
else:
return DBHandlerStatus(status=False, error="Not connected to the database.")

def get_tables(self) -> DBHandlerResponse:
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT table_name as 'table_name' FROM information_schema.tables WHERE table_schema='{self.database}'"
tables_df = pd.read_sql_query(query, self.connection)
return DBHandlerResponse(data=tables_df)
except mysql.connector.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def get_columns(self, table_name: str) -> DBHandlerResponse:
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
query = f"SELECT column_name as 'column_name' FROM information_schema.columns WHERE table_name='{table_name}'"
columns_df = pd.read_sql_query(query, self.connection)
return DBHandlerResponse(data=columns_df)
except mysql.connector.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _fetch_results_as_df(self, cursor):
"""
This is currently the only clean solution that we have found so far.
Reference to MySQL API: https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-fetchall.html
In short, currently there is no very clean programming way to differentiate
CREATE, INSERT, SELECT. CREATE and INSERT do not return any result, so calling
fetchall() on those will yield a programming error. Cursor has an attribute
rowcount, but it indicates # of rows that are affected. In that case, for both
INSERT and SELECT rowcount is not 0, so we also cannot use this API to
differentiate INSERT and SELECT.
"""
try:
res = cursor.fetchall()
if not res:
return pd.DataFrame({"status": ["success"]})
res_df = pd.DataFrame(res, columns=[desc[0] for desc in cursor.description])
return res_df
except mysql.connector.ProgrammingError as e:
if str(e) == "no results to fetch":
return pd.DataFrame({"status": ["success"]})
raise e

def execute_native_query(self, query_string: str) -> DBHandlerResponse:
if not self.connection:
return DBHandlerResponse(data=None, error="Not connected to the database.")

try:
cursor = self.connection.cursor()
cursor.execute(query_string)
return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
except mysql.connector.Error as e:
return DBHandlerResponse(data=None, error=str(e))
1 change: 1 addition & 0 deletions evadb/third_party/databases/mysql/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mysql-connector-python
22 changes: 22 additions & 0 deletions test/third_party_tests/test_native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,28 @@ def test_should_run_query_in_postgres(self):
self._raise_error_on_multiple_creation()
self._raise_error_on_invalid_connection()

def test_should_run_query_in_mysql(self):
# Create database.
params = {
"user": "eva",
"password": "password",
"host": "localhost",
"port": "3306",
"database": "evadb",
}
query = f"""CREATE DATABASE test_data_source
WITH ENGINE = "mysql",
PARAMETERS = {params};"""
execute_query_fetch_all(self.evadb, query)

# Test executions.
self._execute_native_query()
self._execute_evadb_query()

# Test error.
self._raise_error_on_multiple_creation()
self._raise_error_on_invalid_connection()


if __name__ == "__main__":
unittest.main()
145 changes: 145 additions & 0 deletions test/unit_tests/storage/test_mysql_native_storage_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import unittest
from test.util import get_evadb_for_testing
from unittest.mock import MagicMock, patch

import pytest

from evadb.catalog.models.utils import DatabaseCatalogEntry
from evadb.server.command_handler import execute_query_fetch_all


class NativeQueryResponse:
def __init__(self):
self.error = None
self.data = None


@pytest.mark.notparallel
class MySQLNativeStorageEngineTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get_mysql_params(self):
return {
"user": "eva",
"password": "password",
"host": "localhost",
"port": "3306",
"database": "evadb",
}

def setUp(self):
connection_params = self.get_mysql_params()
self.evadb = get_evadb_for_testing()

sys.modules["mysql"] = MagicMock()
sys.modules["mysql.connector"] = MagicMock()

# Create all class level patches
self.get_database_catalog_entry_patcher = patch(
"evadb.catalog.catalog_manager.CatalogManager.get_database_catalog_entry"
)
self.get_database_catalog_entry_mock = (
self.get_database_catalog_entry_patcher.start()
)

self.execute_native_query_patcher = patch(
"evadb.third_party.databases.mysql.mysql_handler.MysqlHandler.execute_native_query"
)
self.execute_native_query_mock = self.execute_native_query_patcher.start()

self.connect_patcher = patch(
"evadb.third_party.databases.mysql.mysql_handler.MysqlHandler.connect"
)
self.connect_mock = self.connect_patcher.start()

self.disconnect_patcher = patch(
"evadb.third_party.databases.mysql.mysql_handler.MysqlHandler.disconnect"
)
self.disconnect_mock = self.disconnect_patcher.start()

# set return values
self.execute_native_query_mock.return_value = NativeQueryResponse()
self.get_database_catalog_entry_mock.return_value = DatabaseCatalogEntry(
name="test_data_source", engine="mysql", params=connection_params, row_id=1
)

def tearDown(self):
self.get_database_catalog_entry_patcher.stop()
self.execute_native_query_patcher.stop()
self.connect_patcher.stop()
self.disconnect_patcher.stop()

def test_execute_mysql_select_query(self):
execute_query_fetch_all(
self.evadb,
"""USE test_data_source {
SELECT * FROM test_table
}""",
)

self.connect_mock.assert_called_once()
self.execute_native_query_mock.assert_called_once()
self.get_database_catalog_entry_mock.assert_called_once()
self.disconnect_mock.assert_called_once()

def test_execute_mysql_insert_query(self):
execute_query_fetch_all(
self.evadb,
"""USE test_data_source {
INSERT INTO test_table (
name, age, comment
) VALUES (
'val', 5, 'testing'
)
}""",
)
self.connect_mock.assert_called_once()
self.execute_native_query_mock.assert_called_once()
self.get_database_catalog_entry_mock.assert_called_once()
self.disconnect_mock.assert_called_once()

def test_execute_mysql_update_query(self):
execute_query_fetch_all(
self.evadb,
"""USE test_data_source {
UPDATE test_table
SET comment = 'update'
WHERE age > 5
}""",
)

self.connect_mock.assert_called_once()
self.execute_native_query_mock.assert_called_once()
self.get_database_catalog_entry_mock.assert_called_once()
self.disconnect_mock.assert_called_once()

def test_execute_mysql_delete_query(self):
execute_query_fetch_all(
self.evadb,
"""USE test_data_source {
DELETE FROM test_table
WHERE age < 5
}""",
)

self.connect_mock.assert_called_once()
self.execute_native_query_mock.assert_called_once()
self.get_database_catalog_entry_mock.assert_called_once()
self.disconnect_mock.assert_called_once()
Loading

0 comments on commit 8aa807b

Please sign in to comment.