Skip to content

[Test] Unstable test SparkStreamingTest read: log table #3129

@wuchong

Description

@wuchong

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.9.0 (latest release)

Please describe the bug 🐞

https://github.com/apache/fluss/actions/runs/24599047572/job/71934470469?pr=2976

SparkStreamingTest:
- write: write to log table
- write: write to primary key table
- read: log table *** FAILED ***
  Error while stopping stream: 
  query.exception() is not empty after clean stop: org.apache.spark.sql.streaming.StreamingQueryException: Failed to update metadata
  === Streaming Query ===
  Identifier: [id = 4a10ba53-3612-4f36-8c2b-edc03172db04, runId = 888a7078-87a2-45b1-94e8-9f4b8d63b915]
  Current Committed Offsets: {org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311: {"version":1,"table_id":2,"bucket_offsets":[3]}}
  Current Available Offsets: {org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311: {"version":1,"table_id":2,"bucket_offsets":[3]}}
  
  Current State: TERMINATED
  Thread State: RUNNABLE
  
  Logical Plan:
  WriteToMicroBatchDataSource MemorySink, 4a10ba53-3612-4f36-8c2b-edc03172db04, Append
  +- SubqueryAlias fluss_catalog.fluss.t
     +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311
  
  
  == Progress ==
     AssertOnQuery(<condition>, )
     CheckLastBatch: 
  => StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@65520982,Map(),null)
     AddFlussData(t,StructType(StructField(id,IntegerType,true),StructField(data,StringType,true)),List([4,data4], [5,data5]))
     AssertOnQuery(<condition>, )
     CheckAnswer: [4,data4],[5,data5]
  
  == Stream ==
  Output Mode: Append
  Stream state: {org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311: {"version":1,"table_id":2,"bucket_offsets":[3]}}
  Thread state: dead
  
  
  
  == Sink ==
  0: 
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 4a10ba53-3612-4f36-8c2b-edc03172db04, Append, 0
  +- SubqueryAlias fluss_catalog.fluss.t
     +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 4a10ba53-3612-4f36-8c2b-edc03172db04, Append, 0
  +- SubqueryAlias fluss_catalog.fluss.t
     +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@69150bfc]
  +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@69150bfc], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$16044/0x0000000804c11440@60737ee1
  +- *(1) Project [id#2277, data#2278]
     +- MicroBatchScan[id#2277, data#2278] class org.apache.fluss.spark.read.FlussAppendScan
  
           
           
  org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  	org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  	org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
  	org.scalatest.Assertions.fail(Assertions.scala:933)
  	org.scalatest.Assertions.fail$(Assertions.scala:929)
  	org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
  	org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:462)
  	org.apache.spark.sql.streaming.StreamTest.verify$1(StreamTest.scala:433)
  	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$26(StreamTest.scala:606)
  	scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
  
  == Progress ==
     AssertOnQuery(<condition>, )
     CheckLastBatch: 
  => StopStream
     StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@65520982,Map(),null)
     AddFlussData(t,StructType(StructField(id,IntegerType,true),StructField(data,StringType,true)),List([4,data4], [5,data5]))
     AssertOnQuery(<condition>, )
     CheckAnswer: [4,data4],[5,data5]
  
  == Stream ==
  Output Mode: Append
  Stream state: {org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311: {"version":1,"table_id":2,"bucket_offsets":[3]}}
  Thread state: dead
  
  
  
  == Sink ==
  0: 
  
  
  == Plan ==
  == Parsed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 4a10ba53-3612-4f36-8c2b-edc03172db04, Append, 0
  +- SubqueryAlias fluss_catalog.fluss.t
     +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Analyzed Logical Plan ==
  WriteToMicroBatchDataSource MemorySink, 4a10ba53-3612-4f36-8c2b-edc03172db04, Append, 0
  +- SubqueryAlias fluss_catalog.fluss.t
     +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Optimized Logical Plan ==
  WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@69150bfc]
  +- StreamingDataSourceV2Relation [id#2277, data#2278], fluss_catalog.fluss.t, FlussAppendScan(fluss.t,TableInfo{tablePath=fluss.t, tableId=2, schemaId=1, schema=Schema{columns=[id INT, data STRING], primaryKey=null, autoIncrementColumnNames=[], highestFieldId=1}, physicalPrimaryKeys=[], bucketKeys=[], partitionKeys=[], numBuckets=1, properties={table.replication.factor=1}, customProperties={owner=fluss}, remoteDataDir=null, comment='null', createdTime=1776497158880, modifiedTime=1776497158880},None,org.apache.spark.sql.util.CaseInsensitiveStringMap@4d5141e1,{bootstrap.servers=127.0.0.1:44085}), org.apache.fluss.spark.read.FlussAppendMicroBatchStream@249ff311, {"version":1,"table_id":2,"bucket_offsets":[3]}, {"version":1,"table_id":2,"bucket_offsets":[3]}
  
  == Physical Plan ==
  WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@69150bfc], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$16044/0x0000000804c11440@60737ee1
  +- *(1) Project [id#2277, data#2278]
     +- MicroBatchScan[id#2277, data#2278] class org.apache.fluss.spark.read.FlussAppendScan (StreamTest.scala:462)
- read: log partition table
- read: primary key table
- read: primary key partition table
- ```

### Solution

_No response_

### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions