Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom InputFile / OutputFile providers for Spark #107

Closed
mccheah opened this issue Nov 16, 2018 · 4 comments
Closed

Custom InputFile / OutputFile providers for Spark #107

mccheah opened this issue Nov 16, 2018 · 4 comments

Comments

@mccheah
Copy link
Contributor

mccheah commented Nov 16, 2018

It would be useful to allow the custom metadata described in #106 to be consumed by the Spark Data Source. For example, it would be helpful to encrypt the files upon writing them. But different users of the data source will have different ways they would want to use that custom metadata to inform how the data is read or written.

We therefore propose supporting a data source option that service loads an instance of the below interface in the data source reader and writer layer.

Below is an API sketch for such a plugin:

class OutputFileWithMetadata {
  OutputFile file;
  Map<String, String> customMetadata;
}

interface IcebergSparkIO {

  /**
   * Opens a data file for reading.
   */
  InputFile open(DataFile file);

  /**
   * Opens a data file for reading, only specifying the location and custom metadata
   * associated with that file.
   *
   * (Location can be either URI or String, prefer URI for stricter semantics)
   */
  InputFile open(URI location, Map<String, String> customMetadata);

  /**
   * Create a new output file, returning a handle to write bytes to it as well as
   * custom metadata that should be included alongside the file when adding the
   * appropriate DataFile to the table.
   *
   * (Location can be either URI or String, prefer URI for stricter semantics)
   */
   OutputFileWithMetadata createNewFile(URI location);

   /**
    * Deletes the file at the given location. Called when jobs have to abort tasks
    * and clean up previously written files.
    * 
    * (Location can be either URI or String, prefer URI for stricter semantics)
    */
   void delete(URI location);
}

It is difficult, however, to make IcebergSparkIO Serializable. Therefore if we're not careful, we would have to service load the implementation on every executor, and that so multiple times. We propose instead to service load a provider class that can be passed the data source options data structure so that the plugin only has to be service loaded once and can be serialized to be distributed to the executor nodes. Therefore we also require the below interface:

inteface IcebergSparkIOFactory extends Serializable {
  IcebergSparkIO create(DataSourceOptions options);
}
@rdblue
Copy link
Contributor

rdblue commented Nov 20, 2018

InputFile and OutputFile instances are supplied by a table's TableOperations. That provides a way to supply your own implementations by overriding the data source used by Spark.

We use MetacatTables to connect to our metastore instead of HadoopTables or HiveTables. The default Iceberg source for DSv2 uses HadoopTables (and will be moving to HiveTables). To override that, we have a subclass of the IcebergSource called IcebergMetacatSource that overrides findTable(DataSourceOptions). The tables returned by that method are loaded by MetacatTables and use MetacatTableOperations, which controls how metadata is committed and how InputFile and OutputFile instances are created.

I'd recommend using the same approach for your integration. The only thing you need to change is to supply a different ServiceLoader config with your source instead of the default.

Also, this will eventually be cleaner when Spark adds catalog support. You'll point your Spark configuration at a catalog implementation directly and that catalog will allow you to instantiate tables with the right TableOperations.

@mccheah
Copy link
Contributor Author

mccheah commented Nov 20, 2018

InputFile and OutputFile instances are supplied by a table's TableOperations. That provides a way to supply your own implementations by overriding the data source used by Spark.

The current DSv2 reader and writer implementations don't do that right now though - they primarily use HadoopOutputFile#fromPath / HadoopInputFile#fromPath. The above proposal was to replace those calls, but it sounds like there are other ways to replace those calls with other APIs. (Additionally to do this we would have to make TableOperations serializable because we would open the InputFile and OutputFile instances on the executors - this sounds difficult.)

Additionally the TableOperations API doesn't clarify f newInputFile(path) is opening a metadata file versus opening a table contents data file - the TableOperations implementation might need to configure the returned InputFile instance depending on if it's reading metadata or reading physical data. It would be beneficial to have TableOperations#readMetadataFile(path) and TableOperations#readTableDataFile(DataFile) to make the distinction. It seems like a core Iceberg concept that the metastore location can be distinct from the physical data location.

The default Iceberg source for DSv2 uses HadoopTables (and will be moving to HiveTables). To override that, we have a subclass of the IcebergSource called IcebergMetacatSource that overrides findTable(DataSourceOptions). The tables returned by that method are loaded by MetacatTables and use MetacatTableOperations, which controls how metadata is committed and how InputFile and OutputFile instances are created.

This would require rewriting much of the Hadoop table operations logic, right? Are we perhaps saying that's the right layer of abstraction to work with? We wanted to treat Iceberg as a bit of a black box (see #92 (comment)) in that Iceberg is just a storage / organizational layer and it takes care of the conventions of file paths in the metastore and the backing store; all we want to do is change how the bytes are written to some location that Iceberg has selected.

(Comment edited because the thought didn't flow smoothly on the first iteration)

@mccheah
Copy link
Contributor Author

mccheah commented Nov 20, 2018

Finally I don't see TableOperations#newDataFile to open a new table contents data file. So we would have to add that API also. We can close this ticket if we'd like and create another one that outlines all of the APIs that are missing from TableOperations and also discuss how to make the right components Serializable.

@rdblue
Copy link
Contributor

rdblue commented Dec 7, 2018

I'm closing this because discussion has moved to the Apache repo: apache/iceberg#12

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants