Skip to content

Spark: can not read rows inserted by flink-sql in spark-sql #2447

@ddsr-ops

Description

@ddsr-ops

environment:

hadoop version: Hadoop 3.0.0-cdh6.2.1
hive version: 2.3.7
spark version: spark 3.0.1
flink version: flink 1.11.3
iceberg: 0.11.0

steps:

  1. create a iceberg in spark-sql

./spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.11.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.hadoop_catalog.type=hadoop --conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://xxx:8020/hdp_data/hive/warehouse --master yarn

spark-sql> create table isolation(id int, msg string) using iceberg;
Time taken: 0.15 seconds
21/04/09 17:56:06 INFO SparkSQLCLIDriver: Time taken: 0.15 seconds
spark-sql> desc formatted isolation;
21/04/09 17:56:18 INFO CodeGenerator: Code generated in 18.984 ms
21/04/09 17:56:18 INFO CodeGenerator: Code generated in 7.674588 ms
id	int	
msg	string	
		
Partitioning		
Not partitioned		
		
Detailed Table Information		
Name	hadoop_catalog.logging.isolation	
Provider	iceberg	
Owner	hive	
Table Properties	[current-snapshot-id=none,format=iceberg/parquet]	
Time taken: 0.113 seconds, Fetched 11 row(s)
21/04/09 17:56:18 INFO SparkSQLCLIDriver: Time taken: 0.113 seconds, Fetched 11 row(s)

spark-sql> select count(*) from isolation;

0
Time taken: 1.536 seconds, Fetched 1 row(s)
21/04/09 18:00:22 INFO SparkSQLCLIDriver: Time taken: 1.536 seconds, Fetched 1 row(s)
  1. insert a row into isolation table above in flink-sql
./bin/sql-client.sh embedded     -j ext_lib/iceberg-flink-runtime-0.11.0.jar     -j ext_lib/flink-sql-connector-hive-2.3.6_2.11-1.11.3.jar Shell
Flink SQL> CREATE CATALOG hadoop_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hadoop',
>   'warehouse'='hdfs://hadoop189:8020/hdp_data/hive/warehouse',
>   'property-version'='1'
> );
[INFO] Catalog has been created.

Flink SQL> use catalog hadoop_catalog;

Flink SQL> use logging;

Flink SQL> show tables;
isolation


Flink SQL> select count(*) from isolation;   -- return 0
[INFO] Result retrieval cancelled.

Flink SQL> insert into isolation values(1, 'flink');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: c095a1649350e007a4670710061b5536


Flink SQL> select * from isolation;
[INFO] Result retrieval cancelled.
                        id                       msg
                         1                     flink
  1. query isolation table in spark-sql, but no result returned.
spark-sql> select * from isolation;
21/04/09 18:08:22 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 629.5 KiB, free 363.7 MiB)
21/04/09 18:08:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 53.9 KiB, free 363.6 MiB)
21/04/09 18:08:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on hadoop189:37478 (size: 53.9 KiB, free: 366.1 MiB)
21/04/09 18:08:22 INFO SparkContext: Created broadcast 7 from broadcast at SparkScanBuilder.java:171
21/04/09 18:08:22 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 40.0 B, free 363.6 MiB)
21/04/09 18:08:22 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 116.0 B, free 363.6 MiB)
21/04/09 18:08:22 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on hadoop189:37478 (size: 116.0 B, free: 366.1 MiB)
21/04/09 18:08:22 INFO SparkContext: Created broadcast 8 from broadcast at SparkScanBuilder.java:172
21/04/09 18:08:22 INFO V2ScanRelationPushDown: 
Pushing operators to hadoop_catalog.logging.isolation
Pushed Filters: 
Post-Scan Filters: 
Output: id#165, msg#166
         
21/04/09 18:08:22 INFO BaseTableScan: Scanning empty table hadoop_catalog.logging.isolation
21/04/09 18:08:22 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
21/04/09 18:08:22 INFO DAGScheduler: Job 3 finished: main at NativeMethodAccessorImpl.java:0, took 0.000113 s
Time taken: 0.059 seconds
21/04/09 18:08:22 INFO SparkSQLCLIDriver: Time taken: 0.059 seconds
  1. i insert a row into isolation table in spark-sql, and query again, the result returned is right.
spark-sql> insert into isolation values(1, 'spark');
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 629.5 KiB, free 363.0 MiB)
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 53.9 KiB, free 362.9 MiB)
21/04/09 18:12:28 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on hadoop189:37478 (size: 53.9 KiB, free: 366.0 MiB)
21/04/09 18:12:28 INFO SparkContext: Created broadcast 9 from broadcast at SparkWriteBuilder.java:131
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 40.0 B, free 362.9 MiB)
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 116.0 B, free 362.9 MiB)
21/04/09 18:12:28 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on hadoop189:37478 (size: 116.0 B, free: 366.0 MiB)
21/04/09 18:12:28 INFO SparkContext: Created broadcast 10 from broadcast at SparkWriteBuilder.java:132
21/04/09 18:12:28 INFO CodeGenerator: Code generated in 6.157067 ms
21/04/09 18:12:28 INFO AppendDataExec: Start processing data source write support: IcebergBatchWrite(table=hadoop_catalog.logging.isolation, format=PARQUET). The input RDD has 1 partitions.
21/04/09 18:12:28 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
21/04/09 18:12:28 INFO DAGScheduler: Got job 4 (main at NativeMethodAccessorImpl.java:0) with 1 output partitions
21/04/09 18:12:28 INFO DAGScheduler: Final stage: ResultStage 2 (main at NativeMethodAccessorImpl.java:0)
21/04/09 18:12:28 INFO DAGScheduler: Parents of final stage: List()
21/04/09 18:12:28 INFO DAGScheduler: Missing parents: List()
21/04/09 18:12:28 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[20] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 7.2 KiB, free 362.9 MiB)
21/04/09 18:12:28 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 4.0 KiB, free 362.9 MiB)
21/04/09 18:12:28 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on hadoop189:37478 (size: 4.0 KiB, free: 366.0 MiB)
21/04/09 18:12:28 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:1223
21/04/09 18:12:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[20] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
21/04/09 18:12:28 INFO YarnScheduler: Adding task set 2.0 with 1 tasks
21/04/09 18:12:28 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1, hadoop189, executor 2, partition 0, PROCESS_LOCAL, 7552 bytes)
21/04/09 18:12:28 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on hadoop189:44388 (size: 4.0 KiB, free: 366.3 MiB)
21/04/09 18:12:28 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on hadoop189:44388 (size: 53.9 KiB, free: 366.2 MiB)
21/04/09 18:12:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on hadoop189:44388 (size: 116.0 B, free: 366.2 MiB)
21/04/09 18:12:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 1793 ms on hadoop189 (executor 2) (1/1)
21/04/09 18:12:30 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 
21/04/09 18:12:30 INFO DAGScheduler: ResultStage 2 (main at NativeMethodAccessorImpl.java:0) finished in 1.802 s
21/04/09 18:12:30 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
21/04/09 18:12:30 INFO YarnScheduler: Killing all running tasks in stage 2: Stage finished
21/04/09 18:12:30 INFO DAGScheduler: Job 4 finished: main at NativeMethodAccessorImpl.java:0, took 1.809582 s
21/04/09 18:12:30 INFO AppendDataExec: Data source write support IcebergBatchWrite(table=hadoop_catalog.logging.isolation, format=PARQUET) is committing.
21/04/09 18:12:30 INFO SparkWrite: Committing append with 1 new data files to table hadoop_catalog.logging.isolation
21/04/09 18:12:30 INFO SnapshotProducer: Committed snapshot 778572246949378127 (MergeAppend)
21/04/09 18:12:30 INFO SparkWrite: Committed in 206 ms
21/04/09 18:12:30 INFO AppendDataExec: Data source write support IcebergBatchWrite(table=hadoop_catalog.logging.isolation, format=PARQUET) committed.
Time taken: 2.215 seconds
21/04/09 18:12:30 INFO SparkSQLCLIDriver: Time taken: 2.215 seconds
spark-sql> 21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_3_piece0 on hadoop189:37478 in memory (size: 116.0 B, free: 366.0 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_6_piece0 on hadoop189:37478 in memory (size: 116.0 B, free: 366.0 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_11_piece0 on hadoop189:37478 in memory (size: 4.0 KiB, free: 366.0 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_11_piece0 on hadoop189:44388 in memory (size: 4.0 KiB, free: 366.2 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_1_piece0 on hadoop189:37478 in memory (size: 116.0 B, free: 366.0 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_4_piece0 on hadoop189:37478 in memory (size: 5.0 KiB, free: 366.0 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_4_piece0 on hadoop189:44388 in memory (size: 5.0 KiB, free: 366.2 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_0_piece0 on hadoop189:37478 in memory (size: 53.9 KiB, free: 366.1 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_9_piece0 on hadoop189:37478 in memory (size: 53.9 KiB, free: 366.1 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_9_piece0 on hadoop189:44388 in memory (size: 53.9 KiB, free: 366.3 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_8_piece0 on hadoop189:37478 in memory (size: 116.0 B, free: 366.1 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_2_piece0 on hadoop189:37478 in memory (size: 53.9 KiB, free: 366.2 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_10_piece0 on hadoop189:37478 in memory (size: 116.0 B, free: 366.2 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_10_piece0 on hadoop189:44388 in memory (size: 116.0 B, free: 366.3 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_5_piece0 on hadoop189:37478 in memory (size: 53.9 KiB, free: 366.2 MiB)
21/04/09 18:12:30 INFO BlockManagerInfo: Removed broadcast_7_piece0 on hadoop189:37478 in memory (size: 53.9 KiB, free: 366.3 MiB)

         > select * from isolation;
21/04/09 18:12:44 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 629.5 KiB, free 365.7 MiB)
21/04/09 18:12:44 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 53.9 KiB, free 365.6 MiB)
21/04/09 18:12:44 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on hadoop189:37478 (size: 53.9 KiB, free: 366.2 MiB)
21/04/09 18:12:44 INFO SparkContext: Created broadcast 12 from broadcast at SparkScanBuilder.java:171
21/04/09 18:12:44 INFO MemoryStore: Block broadcast_13 stored as values in memory (estimated size 40.0 B, free 365.6 MiB)
21/04/09 18:12:44 INFO MemoryStore: Block broadcast_13_piece0 stored as bytes in memory (estimated size 116.0 B, free 365.6 MiB)
21/04/09 18:12:44 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on hadoop189:37478 (size: 116.0 B, free: 366.2 MiB)
21/04/09 18:12:44 INFO SparkContext: Created broadcast 13 from broadcast at SparkScanBuilder.java:172
21/04/09 18:12:44 INFO V2ScanRelationPushDown: 
Pushing operators to hadoop_catalog.logging.isolation
Pushed Filters: 
Post-Scan Filters: 
Output: id#177, msg#178
         
21/04/09 18:12:45 INFO BaseTableScan: Scanning table hadoop_catalog.logging.isolation snapshot 778572246949378127 created at 2021-04-09 18:12:30.771 with filter true
21/04/09 18:12:45 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
21/04/09 18:12:45 INFO DAGScheduler: Got job 5 (main at NativeMethodAccessorImpl.java:0) with 1 output partitions
21/04/09 18:12:45 INFO DAGScheduler: Final stage: ResultStage 3 (main at NativeMethodAccessorImpl.java:0)
21/04/09 18:12:45 INFO DAGScheduler: Parents of final stage: List()
21/04/09 18:12:45 INFO DAGScheduler: Missing parents: List()
21/04/09 18:12:45 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[24] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
21/04/09 18:12:45 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 7.4 KiB, free 365.6 MiB)
21/04/09 18:12:45 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 3.6 KiB, free 365.6 MiB)
21/04/09 18:12:45 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on hadoop189:37478 (size: 3.6 KiB, free: 366.2 MiB)
21/04/09 18:12:45 INFO SparkContext: Created broadcast 14 from broadcast at DAGScheduler.scala:1223
21/04/09 18:12:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[24] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
21/04/09 18:12:45 INFO YarnScheduler: Adding task set 3.0 with 1 tasks
21/04/09 18:12:45 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2, hadoop189, executor 1, partition 0, NODE_LOCAL, 11942 bytes)
21/04/09 18:12:45 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on hadoop189:40069 (size: 3.6 KiB, free: 366.3 MiB)
21/04/09 18:12:46 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on hadoop189:40069 (size: 53.9 KiB, free: 366.2 MiB)
21/04/09 18:12:46 INFO BlockManagerInfo: Added broadcast_13_piece0 in memory on hadoop189:40069 (size: 116.0 B, free: 366.2 MiB)
21/04/09 18:12:47 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 2644 ms on hadoop189 (executor 1) (1/1)
21/04/09 18:12:47 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 
21/04/09 18:12:47 INFO DAGScheduler: ResultStage 3 (main at NativeMethodAccessorImpl.java:0) finished in 2.683 s
21/04/09 18:12:47 INFO DAGScheduler: Job 5 is finished. Cancelling potential speculative or zombie tasks for this job
21/04/09 18:12:47 INFO YarnScheduler: Killing all running tasks in stage 3: Stage finished
21/04/09 18:12:47 INFO DAGScheduler: Job 5 finished: main at NativeMethodAccessorImpl.java:0, took 2.687406 s
1	spark
1	flink
Time taken: 2.826 seconds, Fetched 2 row(s)
21/04/09 18:12:47 INFO SparkSQLCLIDriver: Time taken: 2.826 seconds, Fetched 2 row(s)

I want to know why, please help, thanks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions