Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate table definitions from Hive to Mimir Metadata #321

Closed
6 of 7 tasks
okennedy opened this issue Jun 16, 2019 · 10 comments
Closed
6 of 7 tasks

Migrate table definitions from Hive to Mimir Metadata #321

okennedy opened this issue Jun 16, 2019 · 10 comments

Comments

@okennedy
Copy link
Member

okennedy commented Jun 16, 2019

Presently, Mimir's Spark backend uses Hive to store table definitions. This means that:

  • Tables need to be explicitly loaded into spark via HDFS/S3. They can't be used in-situ on the local file-system. Apart from the usability concerns, this additional step slows down a user's access to data (and makes dynamic loading strategies a'la NoDB harder).
  • Hive manages schema metadata, requiring type translations, and causing a metadata split between Mimir's internal store and itself.
  • Mimir poops out a dozen directories, files in the working directory whenever files are created.

It would be useful to create an infrastructure within Mimir for manually managing table definitions / table schemas, etc... (with the option of reading from Hive as well if configured to do so). Specifically:

  • CREATE TABLE should no longer invoke Spark (purely meta-data based)
  • Table mutators (UPDATE, INSERT, etc...) should no longer invoke Spark
  • LOAD should no longer trigger a write to Spark (though the schema detector pipeline will still read from Spark). Notably, LOAD should store the source URL, source format, and any other information needed to create a Spark DataFrame
  • CREATE MATERIALIZED VIEW and ALTER VIEW MATERIALIZE should materialize data to a configurable location (e.g., local filesystem / hive / s3) in a configurable format (e.g., scala-native binary / csv / json / etc...)
  • Classes in the compilation pipeline (e.g., mimir.compiler.Compiler) should dynamically create the appropriate type of Spark DataFrame as needed.
  • Add a caching layer?
  • Add a facility to connect external data sources with their own schema definitions (e.g., Hive, JDBC, etc...). Perhaps use the same schema field used by Adaptive Schemas to hide these imports behind a distinct namespace.
@okennedy
Copy link
Member Author

A good question for discussion is how to implement table mutators in Mimir. Clearly, updates to tables should be registered from Mimir's side. However, it's worth asking whether such updates should be propagated back to the source. Tagging @mrb24 and @lordpretzel for feedback.

There's really only one major argument for propagating: It keeps the view of the data from the backend in-sync with the view of the data from Mimir.

Conversely...

  • Spark/Hive isn't really update-friendly. We can make updates waaaay faster.
  • Handling updates in Mimir gives us the ability to track and query provenance of updates, a'la GProM, eventually.
  • Handling updates in Mimir gives us the ability to add MAYBE { UPDATE | INSERT | DELETE } operations in the future (i.e., a non-deterministic update/insert/delete).
  • We don't have any active use cases that would benefit from keeping the backend in-sync with the frontend. Even those cases that might would probably benefit more from an UNLOAD operation.

Which leaves the question of how to implement this simulation of updates. My own proposal would be to adopt a GProM-like versioning scheme, where each version is defined as a view over the previous version of the table. This means that we'd need to keep a log of updates to the table in the metadata backend. This could become a performance bottleneck in the longer term, but we could both keep a log and cache the most recent version of the table.

@lordpretzel
Copy link
Collaborator

lordpretzel commented Jun 18, 2019

virtualizing the updates we can batch them before we apply them. Also we can combine this with other lossless versioning at the storage level, e.g., we can make the storage versioned by adding a version column. Updates are kept in the log forever, but every X updates (e.g., X = 1000) we append the new row versions to the table stored in HDFS. This would work like a DB with time-travel + an audit log which GProM uses in Oracle to replay history with or w/o provenance tracking.

The question is ofcourse what the performance overhead is to reenact up to X updates with every query. In C the compilation of 1000 updates is not a bottleneck. Also I think if most or all of the updates come from the spreadsheet, then this simplifies the compilation problem (only last update to a cell has to be applied, updates affect one cell typically, they do not have complex SET expressions)

@okennedy
Copy link
Member Author

After exploring the code a bit, it seems like this should be doable easily if broken up into a read-only step and a read-write step. For the first read-only step, we would support only LOAD. CREATE TABLE, UPDATE, INSERT, and DELETE would error.

At present mimir.backend.SparkBackend serves multiple roles

  1. Managing Spark-related context (which, due to Spark limitations has to be global).
  2. Providing a wrapper around mimir.algebra.spark.OperatorTransforms, including several utility methods like zipWithIndex and an entry point to the compiler logic.
  3. Implementing a data interface (readDataSource, materializeView).

I propose the following changes

  1. Create a new object (e.g., mimir.exec.spark.MimirSpark) to take on managing global Spark-related state. If we need to have a single global spark context, we may as well have it in one place rather than having this strange hybrid global/local object SparkBackend.
  2. Integrate compilation-related methods into mimir.exec.Compiler. At this point, we're tightly enough coupled with Spark that it doesn't make sense t abstract these behind a layer of indirection.
  3. Define a new interface: SchemaProvider, replacing QueryBackend, as a wrapper around components that can define sets of tables. SchemaProviders would be registered with Database at startup (i.e., in Mimir or MimirVizier) or possibly at runtime.

Examples of SchemaProviders include:

  • HiveSchemaProvider: Tables already defined in the Spark runtime.
  • LoadedTableSchemaProvider: LOADed tables.
  • ViewSchemaProvider: LOADed tables.
  • AdaptiveSchema

A schema provider would be registered with

  • A name (e.g., for a provider named foo providing barTable, we'd write SELECT * FROM foo.barTable
  • Whether or not the provider should be searched for tables queried without a schema (i.e., SELECT * FROM barTable).
sealed trait TableImplementation;
case class DataFrameTable(df: DataFrame) extends TableImplementation
case class ViewTable(query: Operator) extends TableImplementation
case class IteratorTable(iter: mimir.exec.result.ResultIterator) extends TableImplementation

trait SchemaProvider {
  // Implementations define these...
  def listTables: Iterable[ID]
  def tableSchema(table: ID): Option[Seq[(ID, Type)]]
  def implementation(table: ID): Option[TableImplementation]

  // These come for free but may be overridden
  def listTablesQuery: Operator =
    HardTable(
      Seq(ID("TABLE"), TString()), 
      listTables.map { Seq(_) }.toSeq
    )
  def listAttributesQuery: Operator = ??? 
}

For step two we can define traits that indicate that the provider can accept bulk writes / view materialization, or updates/inserts/deletes.

@okennedy okennedy self-assigned this Jul 4, 2019
@okennedy
Copy link
Member Author

okennedy commented Jul 4, 2019

A quick chat with @mrb24 led to a revision to the SchemaProvider API. The limiting factor is that depending on situation, different interfaces might be appropriate: In some cases we may want the full query provenance (i.e., we want a view), while in others we want just the dataframe. The idea would be to expose each access path explicitly:

trait SchemaProvider {
  // Implementations define these...
  def listTables: Iterable[ID]
  def tableSchema(table: ID): Option[Seq[(ID, Type)]]
  def dataframe(table: ID): Option[Dataframe]
  def view(table: ID): Option[Operator]
  def iterator(table: ID): Option[ResultIterator]

  // These come for free but may be overridden
  def listTablesQuery: Operator =
    HardTable(
      Seq(ID("TABLE"), TString()), 
      listTables.map { Seq(_) }.toSeq
    )
  def listAttributesQuery: Operator = ??? 
}

For any caller that just wants one particular format, we can provide translation utilities.

@okennedy
Copy link
Member Author

okennedy commented Jul 7, 2019

I would like to decouple loading data in Mimir from the process of data staging My proposal is as follows:

First, the LOAD command (and related mechanisms within Mimir) do nothing more than creating links. You give it the command URI, and the corresponding dataset becomes visible within Mimir (analogous to Spark's load() command.

As I see it, the primary use-cases for staging is Vizier. For uploaded files, at least, it makes a lot more sense to have Vizier handle staging directly (i.e., files get streamed directly into S3/HDFS rather than going through Mimir). This makes it possible to avoid the redundant copy as the data goes through the local filesystem.

For URLs and other network resources like google sheets, spark already seems to do some caching internally. If necessary, we could materialize one of the views used in data loading, transparent to the user.

What do you think @mrb24 ?

@okennedy
Copy link
Member Author

okennedy commented Jul 7, 2019

As a point of curiosity: With Mimir handling table definitions, is it safe to drop the alias field from Table and related Operator subclasses?

okennedy added a commit that referenced this issue Jul 8, 2019
- Work towards getting `SchemaProvider`s integrated into the system as per issue #321
- SparkBackend code has been split between:
   - `mimir.data.SparkSchemaProvider`: Access to spark's Derby/Hive backend data repository
   - `mimir.exec.spark.*` and `mimir.exec.Compiler`: Code related to Spark compilation and execution management.
- Database no longer takes a backend parameter (just a metadata backend).  Spark is now the only query processing infrastructure used.
- Code throughout has been re-factored to manage a global spark context (TODO: Further refactor to make the global spark context a feature of Database)
- OperatorTranslation has been renamed to `RAToSpark` for consistency with the other translation classes.
@okennedy
Copy link
Member Author

okennedy commented Jul 11, 2019

Most of the immutable data access functionality is alive and kicking. In particular see the commit notes for:

  • 369536f : Merging mimir.util.Load* into LoadedTables and generalizing VizierTableNames
  • df9c537 : Phasing out QueryBackend and integrating Spark translation into Mimir's Compiler workflow
  • 937a0b8 : SystemCatalog works
  • 0c6448d : LoadedTables, ViewManager, and AdaptiveSchemaManager integrated into the system catalog

I'm moving on to implementing mutability. In particular

  • Where do materialized views live?
  • Where does bulk model-related metadata live?
  • (How) Do we allow write access to S3/Hive?
  • Support for CREATE/INSERT/UPDATE/DELETE

Where do materialized views live?

I see several places where materialized views can live.

  • Hive (the current behaviour in master).
  • The local filesystem (preferred behaviour for the command-line).
  • The metadata backend?

Hive makes sense when Mimir is connected to a remote Spark instance (e.g., when it's used in Vizier), but carries many of the same limitations that this ticket is trying to address when Mimir is run locally. The metadata backend is a second potential option, but materialized views are bulk data that rather than simple key-value associations. This will hurt if, for example, we ever want to use GIT or Ground for metadata. The local filesystem has the advantage that it also makes Mimir's views accessible from outside of Mimir once they're materialized (the same goes for Hive).

In other words, we want the materialized view target to be configurable (either LoadedTables, Hive, or eventually HDFS). Moreover, it should just be a slightly more powerful form of a schema provider, so I plan to make LoadedTablesSchemaProvider and HiveSchemaProvider instances of the following trait:

trait BulkStorageProvider extends SchemaProvider
{
  def createStoredTableAs(query: Operator, name: ID)
  def dropStoredTable(name: ID)
}

The database (view manager?) would then be configured at start-up to use one of these providers.

Part of the configuration would also involve deciding on a format for these files. Hive already defaults to parquet, and I think I'm going to do the same with LoaedTables

Where does bulk model-related metadata live?

A related problem arises with some lenses and adaptive schemas. For example the (now defunct) DiscalaAbadi adaptive schema needed to materialize several tables of derived state (the FD graph). I see the solution as being similar: The database has a pre-configured bulk storage target, and we just use that.

How/Do we allow write-through access to Hive/HDFS/S3?

Since we're no longer staging, a reasonable question is whether we want to allow users to dump data back in to Hive or similar data stores (i.e., SELECT INTO, CREATE TABLE AS, some staged variant of LOAD, or Vizier-specific functionality). This could, in principle, work exactly as view materialization... the BulkStorageProvider trait defined above would be enough to support it.

Support for Updates

Punting on this for now, but when it gets implemented it'll be along the lines of an Updatable trait, likely implemented along the lines of @lordpretzel and my discussion above.

@okennedy
Copy link
Member Author

okennedy commented Jul 14, 2019

The final question is how to implement staging: locally caching remote resources. Specifically, we'd add an option to the LOAD command

LOAD 'url' WITH STAGING

when WITH STAGING is specified, we create a local copy of the specified url and treat that as the primary URL. We then back-up the original URL somewhere, allowing it to be re-loaded, e.g. by adding a new command

RELOAD table

There seem to be two ways to accomplish this:

  • Create (in effect) a materialized view over the data frame obtained by loading the original URL
    • Pro: Easy to implement / natively supported by BulkStorageProvider
    • Pro: Universally supported: Every format can be used to create a DataFrame
    • Con: The original data file can't be re-parsed.
    • Con: DataFrames don't track provenance, so we'd need to implement support infrastructure to keep track of where it came from (e.g., CREATE TABLE WITH PROVENANCE #319 )
  • Download the specified file and redirect the URL to point locally
    • Con: Needs additional API support (added to BulkSchemaProvider?)
    • Con: Hard (parquet) or impossible (google sheets) to implement for all providers.
    • Con: Getting this to work with Hive/HDFS will be messy
    • Pro: The original file can be re-parsed if we need to change load options.
    • Pro: Implementing RELOAD is easy.

I'm honestly not sure how the latter approach would be implemented for SparkSchemaProvider, since Spark kind of assumes that everything is a Dataset or RDD. We'd have to go around Spark to whatever datastore (Hive, Derby, etc...) it's using on the backend -- and that assumes that the backend even supports raw file access.

It might be more appropriate to define something like:

trait StagingProvider { 
  def stage(url: URL, nameHint: Option[String]): URL 
  def stage(input: InputStream, fileExtension: String, nameHint: Option[String]): URL
  def drop(local: URL) 
}

stage would download a remote URL to a local staging area (e.g., LocalFS or S3) and return a URL to the local copy. During database initialization, we pick a suitable StagingProvider and rely on that for staging.

I'm a little worried that this overlaps in purpose with BulkStorageProvider. For example, we could add the following method:

  def stage(input: DataFrame, nameHint: Option[String]): URL

However, I can't quite square the two. There's a major distinction (and maybe this should be reflected in the names) in that StagingProvider is for raw-file storage (staged files are accessed by URL), while BulkStorageProvider is specifically for storing tables (stored tables are immediately visible within Mimir).
I'm going to rename them:

  • BulkStorageProvider -> MaterializedTableProvider
  • StagingProvider -> RawFileProvider

Action items:

  • Rename providers
  • Implement a LocalFS RawFileProvider (S3 and HDFS can come later)
  • Modify the CREATE TABLE AS workflow to use RawFileProvider
  • Add WITH STAGING to the LOAD command
  • LOAD WITH STAGING called using raw-staging-friendly format ('csv', 'json', 'excel', 'xml', or 'text') should download the specified URL using the Database's preferred RawFileProvider.
  • Add and implement RELOAD table

Of course, we can't just use raw staging. Parquet files don't download easily (multiple files), and some formats (e.g., Google Sheets) just straight up don't download locally. For these, the only practical approach is to materialize the corresponding dataframe. Doing this through ViewManager is tempting. However, ViewManager also tries to materialize supporting metadata (e.g., taint columns) which will not exist here. Worse, if metadata columns are requested that do not exist in the materialized view, ViewManager defaults back to running the source query. This is not behavior that we want. We need a stand-alone thingie for materializing the dataframe.

The natural approach would be to use the database's preferred MaterializedTableProvider. This has the advantage of being simpler to implement, but creates a behavioral fork based on the table format. Some staged tables go down one path, while other staged tables go down another. It also places some loaded tables in the IMPORTED schema, and others in whatever happens to be the currently live MaterializedTableProvider.

A more stable way to implement this would be to use the CREATE TABLE AS workflow instead of the CREATE VIEW workflow. That is, we instantiate the DataFrame as normal, but dump it out to a raw file the way LoadedTables implements MaterializedTableProvider. File formats like Parquet can do this losslessly. Then, we point at the newly materialized Parquet file and set the format to Parquet rather than the original. We would need a new method on RawFileProvider:

  def stage(input: DataFrame, format: String, nameHint: Option[String]): URL

Action Items:

  • Implement DataFrame staging
  • LOAD WITH STAGING called using a non-raw-friendly format (everything else) should instantiate a DataFrame using the specified URL, feed it to the RawFileProvider, and continue the workflow as before. Save the original format along with the url in the MimirMetadata.

@okennedy
Copy link
Member Author

Modulo making sure test cases pass, this branch seems to be feature complete.

@okennedy
Copy link
Member Author

closed with #338 merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants