Skip to content

Commit

Permalink
Throw ValueError when table doesn't exist in PudlSQLiteIOManager.package
Browse files Browse the repository at this point in the history
  • Loading branch information
bendnorman committed Mar 30, 2023
1 parent 7c3d16d commit 775be55
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 62 deletions.
55 changes: 44 additions & 11 deletions src/pudl/io_managers.py
Expand Up @@ -170,11 +170,11 @@ def _get_sqlalchemy_table(self, table_name: str) -> sa.Table:
table: Corresponding SQL Alchemy Table in SQLiteIOManager metadata.
Raises:
RuntimeError: if table_name does not exist in the SQLiteIOManager metadata.
ValueError: if table_name does not exist in the SQLiteIOManager metadata.
"""
sa_table = self.md.tables.get(table_name, None)
if sa_table is None:
raise RuntimeError(
raise ValueError(
f"{sa_table} not found in database metadata. Either add the table to "
"the metadata or use a different IO Manager."
)
Expand Down Expand Up @@ -278,7 +278,7 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):

column_difference = set(sa_table.columns.keys()) - set(df.columns)
if column_difference:
raise RuntimeError(
raise ValueError(
f"{table_name} dataframe is missing columns: {column_difference}"
)

Expand Down Expand Up @@ -396,6 +396,15 @@ def __init__(
to create sqlalchemy metadata and check datatypes of dataframes. If not
specified, defaults to a Package with all metadata stored in the
:mod:`pudl.metadata.resources` subpackage.
Every table that appears in `self.md` is sepcified in `self.package`
as a :class:`pudl.metadata.classes.Resources`. However, not every
:class:`pudl.metadata.classes.Resources` in `self.package` is included
in `self.md` as a table. This is because `self.package` is used to ensure
datatypes of dataframes loaded from database views are correct. However,
the metadata for views in `self.package` should not be used to create
table schemas in the database because views are just stored sql statements
and do not require a schema.
timeout: How many seconds the connection should wait before raising an
exception, if the database is locked by another connection. If another
connection opens a transaction to modify the database, it will be locked
Expand All @@ -407,13 +416,23 @@ def __init__(
md = self.package.to_sql()
super().__init__(base_dir, db_name, md, timeout)

def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):
"""Enforce PUDL DB schema and write dataframe to SQLite."""
def _handle_str_output(self, context: OutputContext, query: str):
"""Execute a sql query on the database.
This is used for creating output views in the database.
Args:
context: dagster keyword that provides access output information like asset
name.
query: sql query to execute in the database.
"""
engine = self.engine
table_name = self._get_table_name(context)
# If table_name doesn't show up in the self.md object, this will raise an error
sa_table = self._get_sqlalchemy_table(table_name)

# Check if there is a Resource in self.package for table_name.
# We don't want folks creating views without adding package metadata.
try:
res = self.package.get_resource(table_name)
_ = self.package.get_resource(table_name)
except ValueError:
raise ValueError(
f"{table_name} does not appear in pudl.metadata.resources. "
Expand All @@ -423,6 +442,19 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):
"it's a work in progress or is distributed in Apache Parquet format."
)

with engine.connect() as con:
# Drop the existing view if it exists and create the new view.
# TODO (bendnorman): parameterize this safely.
con.execute(f"DROP VIEW IF EXISTS {table_name}")
con.execute(query)

def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame):
"""Enforce PUDL DB schema and write dataframe to SQLite."""
table_name = self._get_table_name(context)
# If table_name doesn't show up in the self.md object, this will raise an error
sa_table = self._get_sqlalchemy_table(table_name)
res = self.package.get_resource(table_name)

df = res.enforce_schema(df)

with self.engine.connect() as con:
Expand All @@ -446,9 +478,8 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
name.
"""
table_name = self._get_table_name(context)
# Check if the table_name exists in the self.md object
_ = self._get_sqlalchemy_table(table_name)

# Check if there is a Resource in self.package for table_name
try:
res = self.package.get_resource(table_name)
except ValueError:
Expand All @@ -465,7 +496,9 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
df = pd.concat(
[
res.enforce_schema(chunk_df)
for chunk_df in pd.read_sql(table_name, con, chunksize=100_000)
for chunk_df in pd.read_sql_table(
table_name, con, chunksize=100_000
)
]
)
except ValueError:
Expand Down
127 changes: 76 additions & 51 deletions test/unit/io_managers_test.py
Expand Up @@ -2,7 +2,6 @@
import pandas as pd
import pytest
from dagster import AssetKey, build_input_context, build_output_context
from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, Table
from sqlalchemy.exc import IntegrityError, OperationalError

from pudl.io_managers import (
Expand All @@ -11,26 +10,50 @@
PudlSQLiteIOManager,
SQLiteIOManager,
)
from pudl.metadata.classes import Package, Resource


@pytest.fixture
def sqlite_io_manager_fixture(tmp_path):
"""Create a SQLiteIOManager fixture with a simple database schema."""
md = MetaData()
artist = Table( # noqa: F841
"artist",
md,
Column("artistid", Integer, primary_key=True),
Column("artistname", String(16), nullable=False),
def test_pkg() -> Package:
"""Create a test metadata package for the io manager tests."""
fields = [
{"name": "artistid", "type": "integer"},
{"name": "artistname", "type": "string", "constraints": {"required": True}},
]
schema = {"fields": fields, "primary_key": ["artistid"]}
artist_resource = Resource(name="artist", schema=schema)

fields = [
{"name": "artistid", "type": "integer"},
{"name": "artistname", "type": "string", "constraints": {"required": True}},
]
schema = {"fields": fields, "primary_key": ["artistid"]}
view_resource = Resource(
name="artist_view", schema=schema, include_in_database=False
)
track = Table( # noqa: F841
"track",
md,
Column("trackid", Integer, primary_key=True),
Column("trackname", String(16), nullable=False),
Column("trackartist", Integer, ForeignKey("artist.artistid")),

fields = [
{"name": "trackid", "type": "integer"},
{"name": "trackname", "type": "string", "constraints": {"required": True}},
{"name": "trackartist", "type": "integer"},
]
fkeys = [
{
"fields": ["trackartist"],
"reference": {"resource": "artist", "fields": ["artistid"]},
}
]
schema = {"fields": fields, "primary_key": ["trackid"], "foreign_keys": fkeys}
track_resource = Resource(name="track", schema=schema)
return Package(
name="music", resources=[track_resource, artist_resource, view_resource]
)


@pytest.fixture
def sqlite_io_manager_fixture(tmp_path, test_pkg):
"""Create a SQLiteIOManager fixture with a simple database schema."""
md = test_pkg.to_sql()
return SQLiteIOManager(base_dir=tmp_path, db_name="pudl", md=md)


Expand Down Expand Up @@ -110,7 +133,7 @@ def test_missing_column_error(sqlite_io_manager_fixture):
}
)
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(RuntimeError):
with pytest.raises(ValueError):
manager.handle_output(output_context, artist)


Expand Down Expand Up @@ -163,51 +186,53 @@ def test_incorrect_type_error(sqlite_io_manager_fixture):


def test_missing_schema_error(sqlite_io_manager_fixture):
"""Test a RuntimeError is raised when a table without a schema is loaded."""
"""Test a ValueError is raised when a table without a schema is loaded."""
manager = sqlite_io_manager_fixture

asset_key = "venues"
venue = pd.DataFrame({"venueid": [1], "venuename": "Vans Dive Bar"})
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(RuntimeError):
with pytest.raises(ValueError):
manager.handle_output(output_context, venue)


@pytest.fixture
def pudl_sqlite_io_manager_fixture(tmp_path):
def pudl_sqlite_io_manager_fixture(tmp_path, test_pkg):
"""Create a SQLiteIOManager fixture with a PUDL database schema."""
md = MetaData()
artist = Table( # noqa: F841
"artist",
md,
Column("artistid", Integer, primary_key=True),
Column("artistname", String(16), nullable=False),
)
track = Table( # noqa: F841
"track",
md,
Column("trackid", Integer, primary_key=True),
Column("trackname", String(16), nullable=False),
Column("trackartist", Integer, ForeignKey("artist.artistid")),
)
manager = PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl")
# Override the default PUDL metadata with this test metadata, so that they
# don't match, and we can check what happens.
manager.md = md
return manager


def test_missing_pudl_resource_error(pudl_sqlite_io_manager_fixture):
"""Test that an error is raised when we try to write a non-existent table.
This is a bit contrived, as we have to somehow end up with a table that *does*
appear in the metadata object, but does *not* appear in the PUDL database schema,
which should be impossible, since the schema is generated from the PUDL Package,
unless we pass in a different Package. Maybe that's what we should be doing here?
"""
manager = pudl_sqlite_io_manager_fixture
return PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=test_pkg)


def test_error_when_handling_view_without_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure an error is thrown when a user creates a view without metadata."""
asset_key = "track_view"
sql_stmt = "CREATE VIEW track_view AS SELECT * FROM track;"
output_context = build_output_context(asset_key=AssetKey(asset_key))
with pytest.raises(ValueError):
pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt)


def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure an users can create and load views when it has metadata."""
# Create some sample data
asset_key = "artist"
artist = pd.DataFrame({"artistid": [127], "artistname": ["Co-op Mop"]})
artist = pd.DataFrame({"artistid": [1], "artistname": ["Co-op Mop"]})
output_context = build_output_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.handle_output(output_context, artist)

# create the view
asset_key = "artist_view"
sql_stmt = "CREATE VIEW artist_view AS SELECT * FROM artist;"
output_context = build_output_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt)

# read the view data as a dataframe
input_context = build_input_context(asset_key=AssetKey(asset_key))
pudl_sqlite_io_manager_fixture.load_input(input_context)


def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture):
"""Make sure and error is thrown when a user loads a view without metadata."""
asset_key = "track_view"
input_context = build_input_context(asset_key=AssetKey(asset_key))
with pytest.raises(ValueError):
manager.handle_output(output_context, artist)
pudl_sqlite_io_manager_fixture.load_input(input_context)

0 comments on commit 775be55

Please sign in to comment.