Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MaprFS suppport #346

Closed
btbbass opened this issue Mar 5, 2020 · 6 comments
Closed

MaprFS suppport #346

btbbass opened this issue Mar 5, 2020 · 6 comments

Comments

@btbbass
Copy link

btbbass commented Mar 5, 2020

I tried to use delta lake lib with spark 2.4.4 and a working environment for writing to MPARFS filesystem.

That is, I can successfully read/write to spark.write.format("parquet").save(maprfs:///<path>)
but if I do that spark.write.format("org.apache.spark.sql.delta.sources.DeltaDataSource").save(maprfs:///<path>)

I get those exception:

20/03/05 15:28:15 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.225.8.62:43673 in memory (size: 36.8 KB, free: 970.2 MB)

20/03/05 15:28:15 WARN DeltaLog: Failed to parse maprfs:/_delta_log/_last_checkpoint. This may happen if there was an error during read operation, or a file appears to be partial. Sleeping and trying again.
org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.maprfs.impl=null: No AbstractFileSystem configured for scheme: maprfs
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:169)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:258)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:328)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:328)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:454)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:53)
at org.apache.spark.sql.delta.storage.HDFSLogStore.read(HDFSLogStore.scala:57)
at org.apache.spark.sql.delta.Checkpoints$class.loadMetadataFromFile(Checkpoints.scala:139)
at org.apache.spark.sql.delta.Checkpoints$class.lastCheckpoint(Checkpoints.scala:133)
at org.apache.spark.sql.delta.DeltaLog.lastCheckpoint(DeltaLog.scala:58)
at org.apache.spark.sql.delta.DeltaLog.(DeltaLog.scala:139)
at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1$$anonfun$apply$10.apply(DeltaLog.scala:744)
at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1$$anonfun$apply$10.apply(DeltaLog.scala:744)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1.apply(DeltaLog.scala:743)
at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1.apply(DeltaLog.scala:743)
at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:671)
at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:671)
at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:742)
at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:740)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
...

Is it expected ? Should'nt it be supported/working with the HDFS LogStore ?

@tdas
Copy link
Contributor

tdas commented Mar 5, 2020

HDFSLogStore uses FileContext instead of FileSystem APIs to write files as FileSystem.rename() is not atomic and FileContext.rename() is atomic and therefore ensures Delta's ACID guarantees. You are getting this failure because there is no FileContext/AbstractFileSystem implementation for mapr paths.

You can read more about the properties we need from the file system here - https://docs.delta.io/latest/delta-storage.html

I am not very familiar with the MapR file system and what guarantees does it provide. If MapR FileSystem implementation provides those guarantees mentioned, then someone can build a MapRLogStore using its FileSystem implementation.

@btbbass
Copy link
Author

btbbass commented Mar 6, 2020

Hi,
I indeed done a bit of research, and as MAPRFS is based heavily on HDFS, I succeded in configuring the hdfs configuration in order to be able to write on MAPRFS.

Those are the 2 missing configuration I added (maybe only the second one is needed, still to be
clarified)

spark.sparkContext.hadoopConfiguration.set( "fs.maprfs.impl", "com.mapr.fs.MapRFileSystem" ) spark.sparkContext.hadoopConfiguration.set( "fs.AbstractFileSystem.maprfs.impl", "com.mapr.fs.MFS" )

That way, I was able to write correctly.
Of course, this has not been tested for performance/bugs extensively, but again, behaviour should be the same as with HDFS protocol.

I think we can close the issue.

@tdas
Copy link
Contributor

tdas commented Mar 6, 2020

Yes this configuration will work. But is there documentation on whether com.mapr.fs.MFS implementation provides atomic renames?

@btbbass
Copy link
Author

btbbass commented Mar 10, 2020

Good question, I cannot find clear documentation about it, unfortunately. I will try to come with a clear answer.
Some pointers to a "yes" have been found (across volume, they fall back to a two step process, create/cancel ) but nothing that definitely confirm the atomicity for in-volume renames.

https://mapr.com/community/s/question/0D50L00006BIsjYSAT/cannot-rename-across-volumes-falling-back-on-copydelete-semantics

https://mapr.com/docs/60/MapROverview/c_maprfs.html

@btbbass
Copy link
Author

btbbass commented Mar 10, 2020

From Wikipedia:
https://en.wikipedia.org/wiki/MapR_FS

One major difference between AFS and MapR FS is that the latter uses a strong consistency model while AFS provides only weak consistency.

And again:

Other features of the filesystem include
[...]
Consistent multi-threaded update. Files can be updated or read by very many threads of control simultaneously without requiring global locking structures.

This seems to be in the right direction.

Anyone with more experience on MAPRFS that could confirm that ?

@tdas
Copy link
Contributor

tdas commented Oct 7, 2021

Closing this issue due to inactivity. Please feel free to reopen this issue if there is renewed interest in this feature.

@tdas tdas closed this as completed Oct 7, 2021
tdas pushed a commit to tdas/delta that referenced this issue May 31, 2023
* Add public API to DeltaLog.java

* Update DeltaHistoryManager.scala

* Update DeltaErrors.scala

* Add implementation to DeltaLogImpl

* Add tests to DeltaLogSuite

* fix imports

* fix mima

* fix broken time travel test due to wrong exception type

* Rename API (to 'Timestamp'), update API comment

* Update getActiveCommitAtTime method docs

* Add check for non-existent table

* Add recoverability tests

* update time constants

* remove timeBefore/Start/After final variables, and use numeric constants instead
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants