Skip to content

Comments

[HUDI-4137] SnowflakeSyncTool MVP implementation to integrate with Snowflake#5659

Closed
vingov wants to merge 3 commits intoapache:masterfrom
vingov:HUDI-4137_snowflake_integration
Closed

[HUDI-4137] SnowflakeSyncTool MVP implementation to integrate with Snowflake#5659
vingov wants to merge 3 commits intoapache:masterfrom
vingov:HUDI-4137_snowflake_integration

Conversation

@vingov
Copy link

@vingov vingov commented May 23, 2022

What is the purpose of the pull request

This pull request adds the Snowflake Sync feature, this is a requirement to read Hudi tables on the Snowflake warehouse.

Brief change log

  • Added SnowflakeSyncTool to sync Hudi dataset to Snowflake cloud data warehouse.

Verify this pull request

This change added tests and can be verified as follows:

(example:)

  • Manually verified the change by running a job locally.
  1. Use the docker demo steps to set up the stock_ticks_cow table on google cloud storage.
  2. Create showflake_profile.properties file with the following configs:
URL = https://el36491.us-central1.gcp.snowflakecomputing.com:443
USER = hudidemo
PRIVATE_KEY_FILE = /Users/username/.ssh/rsa_key.p8
ROLE = ACCOUNTADMIN
WAREHOUSE = COMPUTE_WH
DB = hudi
SCHEMA = dwh
  1. Run the following command to sync the table to snowflake
java -cp guava-25.1-jre.jar:hadoop-auth-2.4.1.jar:commons-configuration-1.9.jar:commons-collections-3.2.1.jar:guava-r05.jar:woodstox-core-5.3.0.jar:stax2-api-4.2.jar:hadoop-common-2.7.3.jar:hudi-spark-bundle_2.12-0.12.0-SNAPSHOT.jar:hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar:gcs-connector-hadoop2-latest.jar org.apache.hudi.snowflake.sync.SnowflakeSyncTool --properties-file ~/snowflake_profile.properties --base-path gs://hudi-demo/stock_ticks_cow --table-name stock_ticks_cow --storage-integration hudi_demo_int --partitioned-by "date" --partition-extract-expr "\"date\" date as to_date(substr(metadata\$filename, 22, 10), 'YYYY-MM-DD')"
0    [main] WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
735  [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Loading HoodieTableMetaClient from gs://hudi-demo/stock_ticks_cow
3537 [main] INFO  org.apache.hudi.common.table.HoodieTableConfig  - Loading table properties from gs://hudi-demo/stock_ticks_cow/.hoodie/hoodie.properties
3853 [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://hudi-demo/stock_ticks_cow
3853 [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Loading Active commit timeline for gs://hudi-demo/stock_ticks_cow
3977 [main] INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline  - Loaded instants upto : Option{val=[20220427145307547__commit__COMPLETED]}
[main] INFO com.snowflake.snowpark.Session - Closing stderr and redirecting to stdout
[main] INFO com.snowflake.snowpark.Session - Done closing stderr and redirecting to stdout
[main] INFO com.snowflake.snowpark.internal.ParameterUtils - set JDBC client memory limit to 10240
[main] INFO com.snowflake.snowpark.Session - Snowpark Session information: {
 "snowpark.version" : "1.4.0",
 "java.version" : "1.8.0_282",
 "scala.version" : "2.12.11",
 "jdbc.session.id" : "229321189576710",
 "os.name" : "Mac OS X",
 "jdbc.version" : "3.13.14",
 "snowpark.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar",
 "scala.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar",
 "jdbc.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar"
}
7803 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - Successfully established Snowflake connection.
7803 [main] INFO  org.apache.hudi.snowflake.sync.SnowflakeSyncTool  - Sync hoodie table stock_ticks_cow at base path gs://hudi-demo/stock_ticks_cow
7804 [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Loading HoodieTableMetaClient from gs://hudi-demo/stock_ticks_cow
7959 [main] INFO  org.apache.hudi.common.table.HoodieTableConfig  - Loading table properties from gs://hudi-demo/stock_ticks_cow/.hoodie/hoodie.properties
8277 [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from gs://hudi-demo/stock_ticks_cow
8277 [main] INFO  org.apache.hudi.common.table.HoodieTableMetaClient  - Loading Active commit timeline for gs://hudi-demo/stock_ticks_cow
8376 [main] INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline  - Loaded instants upto : Option{val=[20220427145307547__commit__COMPLETED]}
9358 [main] INFO  org.apache.hudi.sync.common.util.ManifestFileWriter  - Retrieve all partitions: 3
9375 [main] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Took 2 ms to read  0 instants, 0 replaced file groups
9375 [ForkJoinPool.commonPool-worker-2] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Took 2 ms to read  0 instants, 0 replaced file groups
9375 [ForkJoinPool.commonPool-worker-9] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Took 2 ms to read  0 instants, 0 replaced file groups
9488 [ForkJoinPool.commonPool-worker-2] INFO  org.apache.hudi.common.util.ClusteringUtils  - Found 0 files in pending clustering operations
9489 [ForkJoinPool.commonPool-worker-2] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file system view for partition (date=2020-08-31)
9538 [main] INFO  org.apache.hudi.common.util.ClusteringUtils  - Found 0 files in pending clustering operations
9539 [main] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file system view for partition (date=2019-08-31)
9549 [ForkJoinPool.commonPool-worker-9] INFO  org.apache.hudi.common.util.ClusteringUtils  - Found 0 files in pending clustering operations
9549 [ForkJoinPool.commonPool-worker-9] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file system view for partition (date=2018-08-31)
9594 [ForkJoinPool.commonPool-worker-2] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - addFilesToView: NumFiles=3, NumFileGroups=2, FileGroupsCreationTime=6, StoreTimeTaken=1
9634 [ForkJoinPool.commonPool-worker-9] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - addFilesToView: NumFiles=2, NumFileGroups=2, FileGroupsCreationTime=0, StoreTimeTaken=0
9723 [main] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - addFilesToView: NumFiles=2, NumFileGroups=2, FileGroupsCreationTime=1, StoreTimeTaken=0
9723 [main] INFO  org.apache.hudi.sync.common.util.ManifestFileWriter  - Writing base file names to manifest file: 3
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Actively querying parameter QUERY_TAG from server.
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e15-0000-d09100066006] CREATE OR REPLACE STAGE stock_ticks_cow_stage url='gcs://hudi-demo/stock_ticks_cow' STORAGE_INTEGRATION = hudi_demo_int
------------------------------------------------------
|"status"                                            |
------------------------------------------------------
|Stage area STOCK_TICKS_COW_STAGE successfully c...  |
------------------------------------------------------

11467 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - Manifest External table created.
11467 [main] INFO  org.apache.hudi.snowflake.sync.SnowflakeSyncTool  - External temporary stage creation complete for stock_ticks_cow_stage
[main] ERROR com.snowflake.snowpark.internal.ServerConnection - Failed to analyze schema of query:
 SELECT  *  FROM (stock_ticks_cow_manifest)
11662 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - Table doesn't exist stock_ticks_cow_manifest
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e15-0000-d0910006600e] CREATE OR REPLACE EXTERNAL TABLE stock_ticks_cow_manifest (    filename VARCHAR AS split_part(VALUE:c1, '/', -1)  )WITH LOCATION = @stock_ticks_cow_stage/.hoodie/manifest/  FILE_FORMAT = (TYPE = CSV)  AUTO_REFRESH = False
------------------------------------------------------
|"status"                                            |
------------------------------------------------------
|Table STOCK_TICKS_COW_MANIFEST successfully cre...  |
------------------------------------------------------

12521 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - Manifest External table created.
12521 [main] INFO  org.apache.hudi.snowflake.sync.SnowflakeSyncTool  - Manifest table creation complete for stock_ticks_cow_manifest
[main] ERROR com.snowflake.snowpark.internal.ServerConnection - Failed to analyze schema of query:
 SELECT  *  FROM (stock_ticks_cow_versions)
12741 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - Table doesn't exist stock_ticks_cow_versions
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e17-0000-d0910006700a] CREATE OR REPLACE FILE FORMAT my_custom_file_format TYPE = 'PARQUET';
------------------------------------------------------
|"status"                                            |
------------------------------------------------------
|File format MY_CUSTOM_FILE_FORMAT successfully ...  |
------------------------------------------------------

[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e02-0000-d0910006500a]  SELECT  *  FROM (SELECT  generate_column_description(array_agg(object_construct(*)), 'external_table') as columns FROM  table(    infer_schema(      location => '@stock_ticks_cow_stage',      file_format => 'my_custom_file_format'    ))) LIMIT 1
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e02-0000-d0910006500e] CREATE OR REPLACE EXTERNAL TABLE stock_ticks_cow_versions("_hoodie_file_name" TEXT AS ($1:_hoodie_file_name::TEXT),
"_hoodie_commit_time" TEXT AS ($1:_hoodie_commit_time::TEXT),
"_hoodie_partition_path" TEXT AS ($1:_hoodie_partition_path::TEXT),
"high" REAL AS ($1:high::REAL),
"low" REAL AS ($1:low::REAL),
"close" REAL AS ($1:close::REAL),
"day" TEXT AS ($1:day::TEXT),
"_hoodie_record_key" TEXT AS ($1:_hoodie_record_key::TEXT),
"_hoodie_commit_seqno" TEXT AS ($1:_hoodie_commit_seqno::TEXT),
"volume" NUMBER(38,  0) AS ($1:volume::NUMBER(38,  0)),
"symbol" TEXT AS ($1:symbol::TEXT),
"year" NUMBER(38,  0) AS ($1:year::NUMBER(38,  0)),
"ts" TEXT AS ($1:ts::TEXT),
"key" TEXT AS ($1:key::TEXT),
"month" TEXT AS ($1:month::TEXT),
"dummyint" NUMBER(38,  0) AS ($1:dummyint::NUMBER(38,  0)),
"open" REAL AS ($1:open::REAL), "date" date as to_date(substr(metadata$filename, 22, 10), 'YYYY-MM-DD')) PARTITION BY ("date")  WITH LOCATION = @stock_ticks_cow_stage  FILE_FORMAT = (TYPE = PARQUET)  PATTERN = '.*[.]parquet'  AUTO_REFRESH = false
------------------------------------------------------
|"status"                                            |
------------------------------------------------------
|Table STOCK_TICKS_COW_VERSIONS successfully cre...  |
------------------------------------------------------

17170 [main] INFO  org.apache.hudi.snowflake.sync.HoodieSnowflakeSyncClient  - External versions table created.
17170 [main] INFO  org.apache.hudi.snowflake.sync.SnowflakeSyncTool  - Versions table creation complete for stock_ticks_cow_versions
17721 [main] INFO  org.apache.hudi.snowflake.sync.SnowflakeSyncTool  - Sync table complete for stock_ticks_cow
[main] INFO com.snowflake.snowpark.Session - Closing session: {
 "snowpark.version" : "1.4.0",
 "java.version" : "1.8.0_282",
 "scala.version" : "2.12.11",
 "jdbc.session.id" : "229321189576710",
 "os.name" : "Mac OS X",
 "jdbc.version" : "3.13.14",
 "snowpark.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar",
 "scala.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar",
 "jdbc.library" : "/Users/vinothg/oss/dbt-spark/docker/jars/hudi-snowflake-bundle-0.12.0-SNAPSHOT-jar-with-dependencies.jar"
}
[main] INFO com.snowflake.snowpark.Session - Canceling all running query
[main] INFO com.snowflake.snowpark.internal.ServerConnection - Execute query [queryID: 01a47eb9-0000-8e17-0000-d09100067012] select system$cancel_all_queries(229321189576710)

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vingov vingov force-pushed the HUDI-4137_snowflake_integration branch from 223068c to 518decf Compare May 26, 2022 04:59
@nsivabalan nsivabalan added the priority:blocker Production down; release blocker label Jun 9, 2022
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good addition to Hudi. Keep up the good contributions :)

@nsivabalan nsivabalan self-assigned this Jun 9, 2022
@nsivabalan
Copy link
Contributor

@vinothchandar : You may want to review this sometime.

@nsivabalan
Copy link
Contributor

@prasannarajaperumal : Can you review this when you can.

@vingov
Copy link
Author

vingov commented Jun 16, 2022

@nsivabalan - all the requested changes have been done, please review it again, thanks!

@vingov vingov force-pushed the HUDI-4137_snowflake_integration branch from 699d684 to 5b8c6c5 Compare July 17, 2022 19:17
@xushiyan xushiyan added the component:catalog-sync Catalog-sync related label Jul 18, 2022
@apache apache deleted a comment from hudi-bot Jul 18, 2022
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan removed their assignment Jul 19, 2022
@codope codope added priority:critical Production degraded; pipelines stalled and removed priority:blocker Production down; release blocker labels Jul 20, 2022
Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left comments on some changes. Also overall I think the naming of the methods should match the underlying snowflake terminology.

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a top-level module? Shouldn't this be under hudi-sync?

<!-- Snowflake -->
<dependency>
<groupId>com.snowflake</groupId>
<artifactId>snowpark</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand why we need snowpark here?

<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hbase:*</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why exclude these packages?

}
}

private void syncCoWTable(HoodieSnowflakeSyncClient snowSyncClient) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some java docs on the steps to sync a CoW table? It will be easy to follow.

.noDefaultValue()
.withDocumentation("Name of the target table in Snowflake");

public static final ConfigProperty<String> SNOWFLAKE_SYNC_SYNC_BASE_PATH = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to have SYNC_SYNC in the name? Can it just be SNOWFLAKE_SYNC_BASE_PATH?

query += " WITH LOCATION = @" + stageName
+ " FILE_FORMAT = (TYPE = " + config.getString(SNOWFLAKE_SYNC_SYNC_BASE_FILE_FORMAT) + ")"
+ " PATTERN = '.*[.]" + config.getString(SNOWFLAKE_SYNC_SYNC_BASE_FILE_FORMAT).toLowerCase() + "'"
+ " AUTO_REFRESH = false";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make auto-refresh option configurable?

}
}

public void createVersionsTable(String stageName, String tableName, String partitionFields, String partitionExtractExpr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are imposing the naming convention of Gcs here on to snowflake? I dont understand why this external table is called versions table?

@Override
public Option<String> getLastCommitTimeSynced(final String tableName) {
// snowflake doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try storing this as tags on the external table we create?
https://docs.snowflake.com/en/sql-reference/sql/create-tag.html


public void createManifestTable(String stageName, String tableName) {
try {
String query = "CREATE OR REPLACE EXTERNAL TABLE " + tableName + " ("
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better for the list of files to be an internal snowflake table (not an external table). This will unlock compiler optimizations in snowflake for the sub-query on the snapshot view. Plus, This should be a fairly simple to do this using copy into statement in snowflake.

+ " file_format => '" + fileFormatName + "'"
+ " )"
+ ")";
Optional<Row> row = snowflakeSession.sql(query).first();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use JDBC to get the value. Including snowpark for this seems excessive.

@nsivabalan
Copy link
Contributor

@vingov : may I know when are you planning to address the comments.

@xushiyan xushiyan added priority:high Significant impact; potential bugs and removed priority:critical Production degraded; pipelines stalled labels Nov 13, 2022
@zhangyue19921010
Copy link
Contributor

Hey everyone. How is this amazing work going?

We are planing to migrate our all historical data into Lakehouse using Hudi.
At the same time, we are also investigating whether our client can access these hudi data through snowflake.

Looking forward for your replay! Thanks!

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 26, 2024
@yihua
Copy link
Contributor

yihua commented Sep 11, 2024

Apache XTable™ enables existing Hudi tables to seamlessly work with Snowflake. Closing this one.

@yihua yihua closed this Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component:catalog-sync Catalog-sync related priority:high Significant impact; potential bugs size:XL PR with lines of changes > 1000

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

9 participants