Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to process deltas in delta lake? #28

Closed
CyborgDroid opened this issue May 6, 2019 · 7 comments
Closed

How to process deltas in delta lake? #28

CyborgDroid opened this issue May 6, 2019 · 7 comments

Comments

@CyborgDroid
Copy link

CyborgDroid commented May 6, 2019

In databricks I can use MERGE. That doesn't seem supported in the open source version.

MERGE INTO deltatest 
USING tbl_delta 
ON deltatest.PartitionKey IN ('category1', 'category2') 
AND deltatest.uniqueID = tbl_delta.uniqueID 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

error:

Py4JJavaError: An error occurred while calling o25.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'MERGE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)
@tdas
Copy link
Contributor

tdas commented May 6, 2019

We are working towards making the DMLs like update, delete, and merge available on this oss Delta lake.

@CyborgDroid
Copy link
Author

Hi tdas. Is there another way to update only some rows (deltas) and then be able to time travel with the timestampAsOf? If not, do you have an ETA on this?

I know this was just opened sourced, but it is not a delta lake if we can't process deltas with it.

Thanks for your help.

@CyborgDroid CyborgDroid changed the title How to update records in delta lake? How to process deltas in delta lake? May 7, 2019
@mukulmurthy mukulmurthy added the enhancement New feature or request label May 7, 2019
@tdas
Copy link
Contributor

tdas commented May 11, 2019

@CyborgDroid MERGE and UPDATE are the most elegant way of doing exactly that, updating a few rows. Until that is available, you can use the already-supported Spark DataFrame APIs to read and rewrite entire partitions (with the modified rows) of a partitioned Delta table. And then you can use time travel on the Delta table.

@CyborgDroid
Copy link
Author

CyborgDroid commented May 11, 2019

Adding a timestamp field and appending records to a regular parquet or csv file would do the same thing. That is not the same as tracking changes (deltas).

Let's say I have a thousand rows and only 30 are randomly updated every second, I would end up with 2,593,000 records (1000+30x60x60x24) per day in a real Delta table, with the current functionality overwriting all partitions I would end up with 86,400,000 (1000x60x60x24). The updates are random so very few partitions would be skipped in an overwrite unless I make a partition per unique ID (1000 partitions) which hurts speed from what I understand. I'll try partitioning per unique ID though and see if it works.

Please correct my understanding of this if I am wrong.

@mukulmurthy
Copy link
Collaborator

Closing this issue as a duplicate of #42. Feel free to reopen if you have further questions.

@mukulmurthy mukulmurthy removed the enhancement New feature or request label Jun 5, 2019
@harishchanderramesh
Copy link

spark.sql("OPTIMIZE endpoints_delta_table ZORDER BY (hour)")
Traceback (most recent call last):
File "", line 1, in
File "/usr/lib/spark/python/pyspark/sql/session.py", line 767, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input 'OPTIMIZE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\nOPTIMIZE endpoints_delta_table ZORDER BY (hour)\n^^^\n"
type(endpoints_delta_table)

How do I optimize delta tables using pyspark api?

I create delta table using the following.

endpoints_delta_table = DeltaTable.forPath(spark, HDFS_DIR)

HDFS_DIR is the hdfs location where my streaming pyspark application is merging data to. Its a parquet files of delta table.

@tdas
Copy link
Contributor

tdas commented Apr 3, 2020

Optimize SQL command is not available in Delta Lake OSS. Instead you can do manual compaction - https://docs.delta.io/latest/best-practices.html#compact-files

Also, please note, that this is nothing to do with the original issue. Please use new issues for new questions.

LantaoJin pushed a commit to LantaoJin/delta that referenced this issue May 27, 2020
[CARMEL-2481] Repartition before writing should use sortWithinPartitions instead of sort/orderby
LantaoJin added a commit to LantaoJin/delta that referenced this issue Mar 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants