Skip to content

Commit

Permalink
Skip flaky snowflake+pandas tests (#8081)
Browse files Browse the repository at this point in the history
Summary:
This test that I added that runs the snowflake IO manager is failing with errors like:

```
[2022-05-25T22:04:30Z] df =    foo  quux
[2022-05-25T22:04:30Z] 0  bar     1
[2022-05-25T22:04:30Z] 1  baz     2
[2022-05-25T22:04:30Z] 2  bar     1
[2022-05-25T22:04:30Z] 3  baz     2
[2022-05-25T22:04:30Z]
[2022-05-25T22:04:30Z]     @op
[2022-05-25T22:04:30Z]     def read_pandas_df(df: pandas.DataFrame):
[2022-05-25T22:04:30Z]         assert set(df.columns) == {"foo", "quux"}
[2022-05-25T22:04:30Z] >       assert len(df.index) == 2
[2022-05-25T22:04:30Z] E       assert 4 == 2
[2022-05-25T22:04:30Z] E         +4
[2022-05-25T22:04:30Z] E         -2
```

This is presumably because multiple tests are running the same job in parallel, so are writing to the snowflake table simultaneously.

disable it to make master green again, but i'm curious - does this indicate a real problem with the IO manager? generally I would expect that I would be able to run multiple jobs with the same IO manager in parallel (but I am specifying a specific table here, so I see how that is tricky here)
  • Loading branch information
gibsondan committed May 26, 2022
1 parent 284e912 commit 0e0bf21
Showing 1 changed file with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import os
import uuid
from contextlib import contextmanager
from typing import Iterator
from unittest.mock import patch

import pandas
import pytest
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.resources import SnowflakeConnection
from dagster_snowflake.snowflake_io_manager import TableSlice
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from pandas import DataFrame
Expand All @@ -30,6 +35,27 @@
IS_BUILDKITE = os.getenv("BUILDKITE") is not None


SHARED_BUILDKITE_SNOWFLAKE_CONF = {
"account": os.getenv("SNOWFLAKE_ACCOUNT", ""),
"user": "BUILDKITE",
"password": os.getenv("SNOWFLAKE_BUILDKITE_PASSWORD", ""),
}


@contextmanager
def temporary_snowflake_table(schema_name: str, db_name: str) -> Iterator[str]:
snowflake_config = dict(database=db_name, **SHARED_BUILDKITE_SNOWFLAKE_CONF)
table_name = "test_io_manager_" + str(uuid.uuid4()).replace("-", "_")
with SnowflakeConnection(
snowflake_config, logging.getLogger("temporary_snowflake_table")
).get_connection() as conn:
conn.cursor().execute(f"create table {schema_name}.{table_name} (foo string, quux integer)")
try:
yield table_name
finally:
conn.cursor().execute(f"drop table {schema_name}.{table_name}")


def test_handle_output():
with patch("dagster_snowflake_pandas.snowflake_pandas_type_handler._connect_snowflake"):
handler = SnowflakePandasTypeHandler()
Expand Down Expand Up @@ -78,42 +104,47 @@ def test_load_input():
assert df.equals(DataFrame([{"col1": "a", "col2": 1}]))


@op(out=Out(io_manager_key="snowflake", metadata={"schema": "snowflake_io_manager_schema"}))
def emit_pandas_df(_):
return pandas.DataFrame({"foo": ["bar", "baz"], "quux": [1, 2]})


@op
def read_pandas_df(df: pandas.DataFrame):
assert set(df.columns) == {"foo", "quux"}
assert len(df.index) == 2


snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])


@job(
resource_defs={"snowflake": snowflake_io_manager},
config={
"resources": {
"snowflake": {
"config": {
"account": {"env": "SNOWFLAKE_ACCOUNT"},
"user": "BUILDKITE",
"password": {
"env": "SNOWFLAKE_BUILDKITE_PASSWORD",
},
"database": "TEST_SNOWFLAKE_IO_MANAGER",
}
}
}
},
)
def io_manager_test_pipeline():
read_pandas_df(emit_pandas_df())


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_io_manager_with_snowflake_pandas():
res = io_manager_test_pipeline.execute_in_process()
assert res.success
with temporary_snowflake_table(
schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA", db_name="TEST_SNOWFLAKE_IO_MANAGER"
) as table_name:

# Create a job with the temporary table name as an output, so that it will write to that table
# and not interfere with other runs of this test

@op(
out={
table_name: Out(
io_manager_key="snowflake", metadata={"schema": "SNOWFLAKE_IO_MANAGER_SCHEMA"}
)
}
)
def emit_pandas_df(_):
return pandas.DataFrame({"foo": ["bar", "baz"], "quux": [1, 2]})

@op
def read_pandas_df(df: pandas.DataFrame):
assert set(df.columns) == {"foo", "quux"}
assert len(df.index) == 2

snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])

@job(
resource_defs={"snowflake": snowflake_io_manager},
config={
"resources": {
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": "TEST_SNOWFLAKE_IO_MANAGER",
}
}
}
},
)
def io_manager_test_pipeline():
read_pandas_df(emit_pandas_df())

res = io_manager_test_pipeline.execute_in_process()
assert res.success

0 comments on commit 0e0bf21

Please sign in to comment.