diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 7ddc49ce36..fb7cca39ac 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -170,11 +170,11 @@ def _get_sqlalchemy_table(self, table_name: str) -> sa.Table: table: Corresponding SQL Alchemy Table in SQLiteIOManager metadata. Raises: - RuntimeError: if table_name does not exist in the SQLiteIOManager metadata. + ValueError: if table_name does not exist in the SQLiteIOManager metadata. """ sa_table = self.md.tables.get(table_name, None) if sa_table is None: - raise RuntimeError( + raise ValueError( f"{sa_table} not found in database metadata. Either add the table to " "the metadata or use a different IO Manager." ) @@ -278,7 +278,7 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): column_difference = set(sa_table.columns.keys()) - set(df.columns) if column_difference: - raise RuntimeError( + raise ValueError( f"{table_name} dataframe is missing columns: {column_difference}" ) @@ -396,6 +396,15 @@ def __init__( to create sqlalchemy metadata and check datatypes of dataframes. If not specified, defaults to a Package with all metadata stored in the :mod:`pudl.metadata.resources` subpackage. + + Every table that appears in `self.md` is sepcified in `self.package` + as a :class:`pudl.metadata.classes.Resources`. However, not every + :class:`pudl.metadata.classes.Resources` in `self.package` is included + in `self.md` as a table. This is because `self.package` is used to ensure + datatypes of dataframes loaded from database views are correct. However, + the metadata for views in `self.package` should not be used to create + table schemas in the database because views are just stored sql statements + and do not require a schema. timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked @@ -407,13 +416,23 @@ def __init__( md = self.package.to_sql() super().__init__(base_dir, db_name, md, timeout) - def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): - """Enforce PUDL DB schema and write dataframe to SQLite.""" + def _handle_str_output(self, context: OutputContext, query: str): + """Execute a sql query on the database. + + This is used for creating output views in the database. + + Args: + context: dagster keyword that provides access output information like asset + name. + query: sql query to execute in the database. + """ + engine = self.engine table_name = self._get_table_name(context) - # If table_name doesn't show up in the self.md object, this will raise an error - sa_table = self._get_sqlalchemy_table(table_name) + + # Check if there is a Resource in self.package for table_name. + # We don't want folks creating views without adding package metadata. try: - res = self.package.get_resource(table_name) + _ = self.package.get_resource(table_name) except ValueError: raise ValueError( f"{table_name} does not appear in pudl.metadata.resources. " @@ -423,6 +442,19 @@ def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): "it's a work in progress or is distributed in Apache Parquet format." ) + with engine.connect() as con: + # Drop the existing view if it exists and create the new view. + # TODO (bendnorman): parameterize this safely. + con.execute(f"DROP VIEW IF EXISTS {table_name}") + con.execute(query) + + def _handle_pandas_output(self, context: OutputContext, df: pd.DataFrame): + """Enforce PUDL DB schema and write dataframe to SQLite.""" + table_name = self._get_table_name(context) + # If table_name doesn't show up in the self.md object, this will raise an error + sa_table = self._get_sqlalchemy_table(table_name) + res = self.package.get_resource(table_name) + df = res.enforce_schema(df) with self.engine.connect() as con: @@ -446,9 +478,8 @@ def load_input(self, context: InputContext) -> pd.DataFrame: name. """ table_name = self._get_table_name(context) - # Check if the table_name exists in the self.md object - _ = self._get_sqlalchemy_table(table_name) + # Check if there is a Resource in self.package for table_name try: res = self.package.get_resource(table_name) except ValueError: @@ -465,7 +496,9 @@ def load_input(self, context: InputContext) -> pd.DataFrame: df = pd.concat( [ res.enforce_schema(chunk_df) - for chunk_df in pd.read_sql(table_name, con, chunksize=100_000) + for chunk_df in pd.read_sql_table( + table_name, con, chunksize=100_000 + ) ] ) except ValueError: diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index a361fe544a..eae7f4c64f 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -2,7 +2,6 @@ import pandas as pd import pytest from dagster import AssetKey, build_input_context, build_output_context -from sqlalchemy import Column, ForeignKey, Integer, MetaData, String, Table from sqlalchemy.exc import IntegrityError, OperationalError from pudl.io_managers import ( @@ -11,26 +10,50 @@ PudlSQLiteIOManager, SQLiteIOManager, ) +from pudl.metadata.classes import Package, Resource @pytest.fixture -def sqlite_io_manager_fixture(tmp_path): - """Create a SQLiteIOManager fixture with a simple database schema.""" - md = MetaData() - artist = Table( # noqa: F841 - "artist", - md, - Column("artistid", Integer, primary_key=True), - Column("artistname", String(16), nullable=False), +def test_pkg() -> Package: + """Create a test metadata package for the io manager tests.""" + fields = [ + {"name": "artistid", "type": "integer"}, + {"name": "artistname", "type": "string", "constraints": {"required": True}}, + ] + schema = {"fields": fields, "primary_key": ["artistid"]} + artist_resource = Resource(name="artist", schema=schema) + + fields = [ + {"name": "artistid", "type": "integer"}, + {"name": "artistname", "type": "string", "constraints": {"required": True}}, + ] + schema = {"fields": fields, "primary_key": ["artistid"]} + view_resource = Resource( + name="artist_view", schema=schema, include_in_database=False ) - track = Table( # noqa: F841 - "track", - md, - Column("trackid", Integer, primary_key=True), - Column("trackname", String(16), nullable=False), - Column("trackartist", Integer, ForeignKey("artist.artistid")), + + fields = [ + {"name": "trackid", "type": "integer"}, + {"name": "trackname", "type": "string", "constraints": {"required": True}}, + {"name": "trackartist", "type": "integer"}, + ] + fkeys = [ + { + "fields": ["trackartist"], + "reference": {"resource": "artist", "fields": ["artistid"]}, + } + ] + schema = {"fields": fields, "primary_key": ["trackid"], "foreign_keys": fkeys} + track_resource = Resource(name="track", schema=schema) + return Package( + name="music", resources=[track_resource, artist_resource, view_resource] ) + +@pytest.fixture +def sqlite_io_manager_fixture(tmp_path, test_pkg): + """Create a SQLiteIOManager fixture with a simple database schema.""" + md = test_pkg.to_sql() return SQLiteIOManager(base_dir=tmp_path, db_name="pudl", md=md) @@ -110,7 +133,7 @@ def test_missing_column_error(sqlite_io_manager_fixture): } ) output_context = build_output_context(asset_key=AssetKey(asset_key)) - with pytest.raises(RuntimeError): + with pytest.raises(ValueError): manager.handle_output(output_context, artist) @@ -163,51 +186,53 @@ def test_incorrect_type_error(sqlite_io_manager_fixture): def test_missing_schema_error(sqlite_io_manager_fixture): - """Test a RuntimeError is raised when a table without a schema is loaded.""" + """Test a ValueError is raised when a table without a schema is loaded.""" manager = sqlite_io_manager_fixture asset_key = "venues" venue = pd.DataFrame({"venueid": [1], "venuename": "Vans Dive Bar"}) output_context = build_output_context(asset_key=AssetKey(asset_key)) - with pytest.raises(RuntimeError): + with pytest.raises(ValueError): manager.handle_output(output_context, venue) @pytest.fixture -def pudl_sqlite_io_manager_fixture(tmp_path): +def pudl_sqlite_io_manager_fixture(tmp_path, test_pkg): """Create a SQLiteIOManager fixture with a PUDL database schema.""" - md = MetaData() - artist = Table( # noqa: F841 - "artist", - md, - Column("artistid", Integer, primary_key=True), - Column("artistname", String(16), nullable=False), - ) - track = Table( # noqa: F841 - "track", - md, - Column("trackid", Integer, primary_key=True), - Column("trackname", String(16), nullable=False), - Column("trackartist", Integer, ForeignKey("artist.artistid")), - ) - manager = PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl") - # Override the default PUDL metadata with this test metadata, so that they - # don't match, and we can check what happens. - manager.md = md - return manager - - -def test_missing_pudl_resource_error(pudl_sqlite_io_manager_fixture): - """Test that an error is raised when we try to write a non-existent table. - - This is a bit contrived, as we have to somehow end up with a table that *does* - appear in the metadata object, but does *not* appear in the PUDL database schema, - which should be impossible, since the schema is generated from the PUDL Package, - unless we pass in a different Package. Maybe that's what we should be doing here? - """ - manager = pudl_sqlite_io_manager_fixture + return PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=test_pkg) + + +def test_error_when_handling_view_without_metadata(pudl_sqlite_io_manager_fixture): + """Make sure an error is thrown when a user creates a view without metadata.""" + asset_key = "track_view" + sql_stmt = "CREATE VIEW track_view AS SELECT * FROM track;" + output_context = build_output_context(asset_key=AssetKey(asset_key)) + with pytest.raises(ValueError): + pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + + +def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture): + """Make sure an users can create and load views when it has metadata.""" + # Create some sample data asset_key = "artist" - artist = pd.DataFrame({"artistid": [127], "artistname": ["Co-op Mop"]}) + artist = pd.DataFrame({"artistid": [1], "artistname": ["Co-op Mop"]}) + output_context = build_output_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.handle_output(output_context, artist) + + # create the view + asset_key = "artist_view" + sql_stmt = "CREATE VIEW artist_view AS SELECT * FROM artist;" output_context = build_output_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + + # read the view data as a dataframe + input_context = build_input_context(asset_key=AssetKey(asset_key)) + pudl_sqlite_io_manager_fixture.load_input(input_context) + + +def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture): + """Make sure and error is thrown when a user loads a view without metadata.""" + asset_key = "track_view" + input_context = build_input_context(asset_key=AssetKey(asset_key)) with pytest.raises(ValueError): - manager.handle_output(output_context, artist) + pudl_sqlite_io_manager_fixture.load_input(input_context)