Skip to content

Commit

Permalink
fix(flink): add test cases for recreate table
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenzhongxu authored and gforsyth committed Dec 6, 2023
1 parent 0c9791f commit 1413de9
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 27 deletions.
6 changes: 5 additions & 1 deletion ibis/backends/flink/__init__.py
Expand Up @@ -419,7 +419,7 @@ def drop_table(
def create_view(
self,
name: str,
obj: ir.Table,
obj: pd.DataFrame | ir.Table | None = None,
*,
database: str | None = None,
catalog: str | None = None,
Expand Down Expand Up @@ -449,6 +449,10 @@ def create_view(
if obj is None:
raise exc.IbisError("The obj parameter is required")

if isinstance(obj, ir.Table):
# TODO(chloeh13q): implement CREATE VIEW for expressions
raise NotImplementedError

if overwrite and self.list_views(name):
self.drop_view(name=name, catalog=catalog, database=database, force=True)

Expand Down
193 changes: 167 additions & 26 deletions ibis/backends/flink/tests/test_ddl.py
Expand Up @@ -17,6 +17,34 @@
except ImportError:
Py4JJavaError = None

_awards_players_schema = sch.Schema(
{
"playerID": dt.string,
"awardID": dt.string,
"yearID": dt.int32,
"lgID": dt.string,
"tie": dt.string,
"notes": dt.string,
}
)

_functiona_alltypes_schema = sch.Schema(
{
"id": dt.int32,
"bool_col": dt.bool,
"smallint_col": dt.int16,
"int_col": dt.int32,
"bigint_col": dt.int64,
"float_col": dt.float32,
"double_col": dt.float64,
"date_string_col": dt.string,
"string_col": dt.string,
"timestamp_col": dt.timestamp(scale=3),
"year": dt.int32,
"month": dt.int32,
}
)


@pytest.fixture(autouse=True)
def reset_con(con):
Expand All @@ -28,36 +56,12 @@ def reset_con(con):

@pytest.fixture
def awards_players_schema():
return sch.Schema(
{
"playerID": dt.string,
"awardID": dt.string,
"yearID": dt.int32,
"lgID": dt.string,
"tie": dt.string,
"notes": dt.string,
}
)
return _awards_players_schema


@pytest.fixture
def functiona_alltypes_schema():
return sch.Schema(
{
"id": dt.int32,
"bool_col": dt.bool,
"smallint_col": dt.int16,
"int_col": dt.int32,
"bigint_col": dt.int64,
"float_col": dt.float32,
"double_col": dt.float64,
"date_string_col": dt.string,
"string_col": dt.string,
"timestamp_col": dt.timestamp(scale=3),
"year": dt.int32,
"month": dt.int32,
}
)
return _functiona_alltypes_schema


@pytest.fixture
Expand Down Expand Up @@ -117,6 +121,143 @@ def test_create_table(con, awards_players_schema, temp_table, csv_source_configs
assert temp_table not in con.list_tables()


def test_recreate_table_from_schema(
con, awards_players_schema, temp_table, csv_source_configs
):
# create table once
new_table = con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=csv_source_configs("awards_players"),
)
assert temp_table in con.list_tables()
assert new_table.schema() == awards_players_schema

# create the same table a second time should fail
with pytest.raises(
Py4JJavaError,
match="org.apache.flink.table.catalog.exceptions.TableAlreadyExistException",
):
new_table = con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=csv_source_configs("awards_players"),
overwrite=False,
)


def test_force_recreate_table_from_schema(
con, awards_players_schema, temp_table, csv_source_configs
):
# create table once
new_table = con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=csv_source_configs("awards_players"),
)
assert temp_table in con.list_tables()
assert new_table.schema() == awards_players_schema

# force creating the same twice a second time
new_table = con.create_table(
temp_table,
schema=awards_players_schema,
tbl_properties=csv_source_configs("awards_players"),
overwrite=True,
)
assert temp_table in con.list_tables()
assert new_table.schema() == awards_players_schema


@pytest.mark.parametrize(
"employee_df",
[
pd.DataFrame(
[("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")]
)
],
)
@pytest.mark.parametrize(
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
)
def test_recreate_in_mem_table(
con, employee_df, schema_props, temp_table, csv_source_configs
):
# create table once
schema = schema_props[0]
if schema_props[1] is not None:
tbl_properties = csv_source_configs(schema_props[1])
else:
tbl_properties = None

new_table = con.create_table(
name=temp_table,
obj=employee_df,
schema=schema,
tbl_properties=tbl_properties,
)
assert temp_table in con.list_tables()
if schema is not None:
assert new_table.schema() == schema

# create the same table a second time should fail
with pytest.raises(
Py4JJavaError,
match="An error occurred while calling o8.createTemporaryView",
):
new_table = con.create_table(
name=temp_table,
obj=employee_df,
schema=schema,
tbl_properties=tbl_properties,
overwrite=False,
)


@pytest.mark.parametrize(
"employee_df",
[
pd.DataFrame(
[("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")]
)
],
)
@pytest.mark.parametrize(
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
)
def test_force_recreate_in_mem_table(
con, employee_df, schema_props, temp_table, csv_source_configs
):
# create table once
schema = schema_props[0]
if schema_props[1] is not None:
tbl_properties = csv_source_configs(schema_props[1])
else:
tbl_properties = None

new_table = con.create_table(
name=temp_table,
obj=employee_df,
schema=schema,
tbl_properties=tbl_properties,
)
assert temp_table in con.list_tables()
if schema is not None:
assert new_table.schema() == schema

# force recreate the same table a second time should succeed
new_table = con.create_table(
name=temp_table,
obj=employee_df,
schema=schema,
tbl_properties=tbl_properties,
overwrite=True,
)
assert temp_table in con.list_tables()
if schema is not None:
assert new_table.schema() == schema


def test_create_source_table_with_watermark(
con, functiona_alltypes_schema, temp_table, csv_source_configs
):
Expand Down

0 comments on commit 1413de9

Please sign in to comment.