Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixes types comparison #353

Merged
merged 3 commits into from
Jan 27, 2022
Merged

fix: Fixes types comparison #353

merged 3 commits into from
Jan 27, 2022

Conversation

jiangmichaellll
Copy link
Contributor

Fixes #261

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 26, 2022 20:00
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. label Jan 26, 2022
@jiangmichaellll
Copy link
Contributor Author

== or != compares reference while sameType compares the actual value.

@anguillanneuf
Copy link
Collaborator

anguillanneuf commented Jan 27, 2022

I created a streaming dataframe of the following schema:

>> sdf.printSchema()

 |-- event_timestamp: timestamp (nullable = true)
 |-- key: binary (nullable = false)
 |-- data: binary (nullable = true)
 |-- attributes: map (nullable = false)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = false)
 |    |    |-- element: binary (containsNull = false)

But writing it to Pub/Sub Lite still failed. The complaint is java.lang.NoClassDefFoundError: scala/Function2$class which has provided scope.

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

2022-01-27 01:08:25 ERROR MicroBatchExecution:91 - Query [id = 281f3108-63f9-488e-a2b3-14d6c68cc773, runId = eb1e277f-a444-41d5-af05-e9d9539738d4] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:136)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:160)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:252)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:301)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3388)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3369)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:551)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:547)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:546)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, tz-lite-w-1.c.tz-playground-bigdata.internal, executor 2): java.lang.NoClassDefFoundError: scala/Function2$class
        at scala.compat.java8.functionConverterImpls.FromJavaBiConsumer.<init>(FunctionConverters.scala:12)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.lambda$toPubSubMessage$7(PslSparkUtils.java:149)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.extractVal(PslSparkUtils.java:116)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.toPubSubMessage(PslSparkUtils.java:141)
        at com.google.cloud.pubsublite.spark.PslDataWriter.write(PslDataWriter.java:72)
        at com.google.cloud.pubsublite.spark.PslDataWriter.write(PslDataWriter.java:37)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:414)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 27, 2022 01:33
@anguillanneuf
Copy link
Collaborator

anguillanneuf commented Jan 27, 2022

@jiangmichaellll I tried a Python example after the scala library was included in the shaded jar, and it worked.
I'm surprised that it's not provided by Dataproc. It does increase the size of the shaded jar from 48.3 MiB to 53.8 MiB.

@jiangmichaellll
Copy link
Contributor Author

Thanks for uncovering that deps issue! I am going to check this in and create another PR tmr to fix the deps issue.

@jiangmichaellll jiangmichaellll merged commit 451901a into main Jan 27, 2022
@jiangmichaellll jiangmichaellll deleted the jiangmichael-b-199225459 branch January 27, 2022 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-spark API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Issue using PySpark writeStream to write to Pub/Sub Lite with an attributes field
2 participants