Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write delta in spark, when spline parses spark lineage, an error is reported? #609

Closed
jinmu0410 opened this issue Feb 21, 2023 · 24 comments
Labels
bug Something isn't working

Comments

@jinmu0410
Copy link

jinmu0410 commented Feb 21, 2023

image

submit code

./spark-submit --class com.hs.test
--master spark://jinmudeMacBook-Pro.local:7077
--name test_015
--jars "/Users/jinmu/Downloads/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar" \
--conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener \
--conf spark.spline.lineageDispatcher=console \
/Users/jinmu/Downloads/hs/lakehouse/target/hs-lakehouse-1.0-SNAPSHOT.jar

this code

val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("test1")
.master("local[*]")
.getOrCreate()

val path = "/Users/jinmu/Downloads/111.csv"

val readerOptions = Map(
  "delimiter" ->",",
  "header" -> "true",
  "encoding" -> "utf-8",
  "escape" -> "\""
)
import spark.implicits._

spark.read.format("csv").options(readerOptions).load(path)
  .write.format("delta").mode("append")
  .save("/Users/jinmu/Downloads/delta/jinmu.db/test33")

who can help me?

@jinmu0410 jinmu0410 added the bug Something isn't working label Feb 21, 2023
@cerveada
Copy link
Contributor

Can you post the whole exception? Preferably as a text.

@jinmu0410
Copy link
Author

jinmu0410 commented Feb 21, 2023

ok

23/02/21 17:43:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/21 17:43:38 INFO SparkContext: Running Spark version 3.1.2
23/02/21 17:43:39 INFO ResourceUtils: ==============================================================
23/02/21 17:43:39 INFO ResourceUtils: No custom resources configured for spark.driver.
23/02/21 17:43:39 INFO ResourceUtils: ==============================================================
23/02/21 17:43:39 INFO SparkContext: Submitted application: test1
23/02/21 17:43:39 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/02/21 17:43:39 INFO ResourceProfile: Limiting resource is cpu
23/02/21 17:43:39 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/02/21 17:43:39 INFO SecurityManager: Changing view acls to: jinmu
23/02/21 17:43:39 INFO SecurityManager: Changing modify acls to: jinmu
23/02/21 17:43:39 INFO SecurityManager: Changing view acls groups to: 
23/02/21 17:43:39 INFO SecurityManager: Changing modify acls groups to: 
23/02/21 17:43:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jinmu); groups with view permissions: Set(); users  with modify permissions: Set(jinmu); groups with modify permissions: Set()
23/02/21 17:43:39 INFO Utils: Successfully started service 'sparkDriver' on port 56109.
23/02/21 17:43:39 INFO SparkEnv: Registering MapOutputTracker
23/02/21 17:43:39 INFO SparkEnv: Registering BlockManagerMaster
23/02/21 17:43:39 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/02/21 17:43:39 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/02/21 17:43:39 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/02/21 17:43:39 INFO DiskBlockManager: Created local directory at /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/blockmgr-315a09b6-cd7a-4b15-b807-99473f820d62
23/02/21 17:43:39 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
23/02/21 17:43:39 INFO SparkEnv: Registering OutputCommitCoordinator
23/02/21 17:43:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/02/21 17:43:39 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.218.8:4040
23/02/21 17:43:39 INFO SparkContext: Added JAR file:///Users/jinmu/Downloads/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar at spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar with timestamp 1676972618954
23/02/21 17:43:39 INFO SparkContext: Added JAR file:/Users/jinmu/Downloads/hs/lakehouse/target/hs-lakehouse-1.0-SNAPSHOT.jar at spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO Executor: Starting executor ID driver on host 192.168.218.8
23/02/21 17:43:40 INFO Executor: Fetching spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO TransportClientFactory: Successfully created connection to /192.168.218.8:56109 after 27 ms (0 ms spent in bootstraps)
23/02/21 17:43:40 INFO Utils: Fetching spark://192.168.218.8:56109/jars/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar to /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/fetchFileTemp2786412298596410544.tmp
23/02/21 17:43:40 INFO Executor: Adding file:/private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/spark-3.1-spline-agent-bundle_2.12-1.0.4.jar to class loader
23/02/21 17:43:40 INFO Executor: Fetching spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar with timestamp 1676972618954
23/02/21 17:43:40 INFO Utils: Fetching spark://192.168.218.8:56109/jars/hs-lakehouse-1.0-SNAPSHOT.jar to /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/fetchFileTemp8602971486436238988.tmp
23/02/21 17:43:40 INFO Executor: Adding file:/private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44/userFiles-6b25b9b2-0b90-4e58-86e7-1a527c6b54ba/hs-lakehouse-1.0-SNAPSHOT.jar to class loader
23/02/21 17:43:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56111.
23/02/21 17:43:40 INFO NettyBlockTransferService: Server created on 192.168.218.8:56111
23/02/21 17:43:40 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/02/21 17:43:40 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.218.8:56111 with 366.3 MiB RAM, BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.218.8, 56111, None)
23/02/21 17:43:40 INFO SingleEventLogFileWriter: Logging events to file:/Users/jinmu/Downloads/directory/local-1676972619872.inprogress
23/02/21 17:43:40 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/jinmu/Downloads/soft/spark-3.0.3-bin-hadoop2.7/bin/spark-warehouse').
23/02/21 17:43:40 INFO SharedState: Warehouse path is 'file:/Users/jinmu/Downloads/soft/spark-3.0.3-bin-hadoop2.7/bin/spark-warehouse'.
23/02/21 17:43:41 INFO SparkUI: Stopped Spark web UI at http://192.168.218.8:4040
23/02/21 17:43:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/21 17:43:41 INFO MemoryStore: MemoryStore cleared
23/02/21 17:43:41 INFO BlockManager: BlockManager stopped
23/02/21 17:43:41 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/21 17:43:41 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/21 17:43:41 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at io.delta.sql.parser.DeltaSqlParser.<init>(DeltaSqlParser.scala:71)
	at io.delta.sql.DeltaSparkSessionExtension.$anonfun$apply$1(DeltaSparkSessionExtension.scala:79)
	at org.apache.spark.sql.SparkSessionExtensions.$anonfun$buildParser$1(SparkSessionExtensions.scala:239)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
	at org.apache.spark.sql.SparkSessionExtensions.buildParser(SparkSessionExtensions.scala:238)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser$lzycompute(BaseSessionStateBuilder.scala:124)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser(BaseSessionStateBuilder.scala:123)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:341)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1145)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:159)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
	at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:997)
	at org.apache.spark.sql.SparkSession.read(SparkSession.scala:658)
	at com.hs.test$.main(test.scala:36)
	at com.hs.test.main(test.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/02/21 17:43:41 INFO ShutdownHookManager: Shutdown hook called
23/02/21 17:43:41 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-22f428e3-b4f2-41d8-8ff4-684d30e6517f
23/02/21 17:43:41 INFO ShutdownHookManager: Deleting directory /private/var/folders/g6/3j6jyxt964x1pk4qk7_b49rw0000gn/T/spark-15280169-319f-4d7b-8d11-e3261d824b44

@cerveada cerveada transferred this issue from AbsaOSS/spline Feb 21, 2023
@cerveada
Copy link
Contributor

From the log it seems this is caused by loading some class in io.delta.sql.parser.DeltaSqlParser. So, not an issue caused by Spline Agent.

Can you try to run the same thing without Spline Agent?

I would also check if the proper delta version is in use: https://docs.delta.io/latest/releases.html

@jinmu0410
Copy link
Author

yes, thanks

@jinmu0410
Copy link
Author

@cerveada My spark task is wrong, but the lineage still comes out,is this normal?

@jinmu0410
Copy link
Author

jinmu0410 commented Feb 22, 2023

23/02/22 16:06:53 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
23/02/22 16:06:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 339.9 KiB, free 365.6 MiB)
23/02/22 16:06:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 30.8 KiB, free 365.6 MiB)
23/02/22 16:06:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.218.12:51952 (size: 30.8 KiB, free: 366.2 MiB)
23/02/22 16:06:53 INFO SparkContext: Created broadcast 2 from load at test.scala:50
23/02/22 16:06:53 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/02/22 16:06:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/22 16:06:54 INFO SparkUI: Stopped Spark web UI at http://192.168.218.12:4040
ExecutionPlan (apiVersion: 1.2):
{"id":"4427d12c-d567-5ccd-b89a-a9d95f858ef2","name":"test123","operations":{"write":{"outputSource":"jdbc:mysql://192.168.110.40:3306:test.test_csv","append":true,"id":"op-0","name":"SaveIntoDataSourceCommand","childIds":["op-1"],"extra":{"destinationType":"jdbc"}},"reads":[{"inputSources":["hdfs://lake-node3:8020/jinmu/test/111.csv"],"id":"op-3","name":"LogicalRelation","output":["attr-0","attr-1","attr-2","attr-3","attr-4","attr-5","attr-6","attr-7","attr-8","attr-9","attr-10","attr-11","attr-12","attr-13","attr-14","attr-15","attr-16","attr-17","attr-18","attr-19","attr-20","attr-21","attr-22","attr-23","attr-24","attr-25","attr-26","attr-27","attr-28","attr-29","attr-30","attr-31","attr-32","attr-33","attr-34","attr-35","attr-36","attr-37","attr-38","attr-39","attr-40"],"params":{"path":"/jinmu/test/111.csv","encoding":"utf-8","escape":"\"","header":"true","delimiter":","},"extra":{"sourceType":"csv"}}],"other":[{"id":"op-2","name":"SubqueryAlias","childIds":["op-3"],"output":["attr-0","attr-1","attr-2","attr-3","attr-4","attr-5","attr-6","attr-7","attr-8","attr-9","attr-10","attr-11","attr-12","attr-13","attr-14","attr-15","attr-16","attr-17","attr-18","attr-19","attr-20","attr-21","attr-22","attr-23","attr-24","attr-25","attr-26","attr-27","attr-28","attr-29","attr-30","attr-31","attr-32","attr-33","attr-34","attr-35","attr-36","attr-37","attr-38","attr-39","attr-40"],"params":{"identifier":"test_scv_tmp"}},{"id":"op-1","name":"Project","childIds":["op-2"],"output":["attr-1","attr-4"],"params":{"projectList":[{"__attrId":"attr-1"},{"__attrId":"attr-4"}]}}]},"attributes":[{"id":"attr-0","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbit"},{"id":"attr-1","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbool"},{"id":"attr-2","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbox"},{"id":"attr-3","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sbytea"},{"id":"attr-4","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"schar"},{"id":"attr-5","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"scidr"},{"id":"attr-6","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"scircle"},{"id":"attr-7","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sdate"},{"id":"attr-8","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sdecimal"},{"id":"attr-9","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sfloat4"},{"id":"attr-10","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sfloat8"},{"id":"attr-11","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sinet"},{"id":"attr-12","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint2"},{"id":"attr-13","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint4"},{"id":"attr-14","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sint8"},{"id":"attr-15","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sinterval"},{"id":"attr-16","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sjson"},{"id":"attr-17","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sjsonb"},{"id":"attr-18","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sline"},{"id":"attr-19","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"slseg"},{"id":"attr-20","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"smacaddr"},{"id":"attr-21","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"smonkey"},{"id":"attr-22","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"snumeric"},{"id":"attr-23","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spath"},{"id":"attr-24","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spoint"},{"id":"attr-25","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"spolygon"},{"id":"attr-26","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial2"},{"id":"attr-27","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial4"},{"id":"attr-28","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sserial8"},{"id":"attr-29","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stext"},{"id":"attr-30","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stime"},{"id":"attr-31","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimestamp"},{"id":"attr-32","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimestamptz"},{"id":"attr-33","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stimetz"},{"id":"attr-34","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stsquery"},{"id":"attr-35","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stsvector"},{"id":"attr-36","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"stxid_snapshot"},{"id":"attr-37","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"svarbit"},{"id":"attr-38","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"suuid"},{"id":"attr-39","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"svarchar"},{"id":"attr-40","dataType":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"sxml"}],"expressions":{},"systemInfo":{"name":"spark","version":"3.1.2"},"agentInfo":{"name":"spline","version":"1.0.4"},"extraInfo":{"appName":"test123","dataTypes":[{"_typeHint":"dt.Simple","id":"e63adadc-648a-56a0-9424-3289858cf0bb","name":"string","nullable":true}]}}
ExecutionEvent (apiVersion: 1.2):
{"planId":"4427d12c-d567-5ccd-b89a-a9d95f858ef2","timestamp":1677053222119,"error":"java.sql.SQLException: No suitable driver\n\tat java.sql.DriverManager.getDriver(DriverManager.java:315)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)\n\tat org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)\n\tat com.hs.test$.main(test.scala:55)\n\tat com.hs.test.main(test.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\n\tat org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\n\tat org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\n\tat org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\n\tat org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\n\tat org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)\n\tat org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)\n\tat org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)\n","extra":{"appId":"local-1677053200739","user":"jinmu","readMetrics":{},"writeMetrics":{}}}
23/02/22 16:07:02 INFO AsyncEventQueue: Process of event SparkListenerSQLExecutionEnd(2,1677053214309) by listener ExecutionListenerBus took 8.060134333s.
23/02/22 16:07:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/22 16:07:02 INFO MemoryStore: MemoryStore cleared
23/02/22 16:07:02 INFO BlockManager: BlockManager stopped
23/02/22 16:07:02 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/22 16:07:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/22 16:07:02 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.sql.SQLException: No suitable driver
	at java.sql.DriverManager.getDriver(DriverManager.java:315)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at com.hs.test$.main(test.scala:55)
	at com.hs.test.main(test.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@cerveada
Copy link
Contributor

yes, spline is able to capture lineage of both successful and failed jobs.

@jinmu0410
Copy link
Author

jinmu0410 commented Feb 22, 2023

@cerveada so, i have another question,Lineage sent to kafka

23/02/22 17:19:48 INFO SharedState: Warehouse path is 'file:/Users/jinmu/Downloads/soft/spark-3.1.2-bin-hadoop3.2/bin/spark-warehouse'.
23/02/22 17:19:49 INFO SparkLineageInitializer: Initializing Spline Agent...
23/02/22 17:19:49 INFO SparkLineageInitializer: Spline Version: 1.0.4 (rev. 4266fca)
23/02/22 17:19:49 INFO SparkLineageInitializer: Init Type: AUTO (codeless)
23/02/22 17:19:49 INFO SparkLineageInitializer: Init Mode: ENABLED
23/02/22 17:19:49 INFO SplineRecordSenderFactory: Kafka topic: spline_test
23/02/22 17:19:49 ERROR SparkLineageInitializer: Spline initialization failed! Spark Lineage tracking is DISABLED.
org.apache.commons.configuration.ConversionException: 'max.in.flight.requests.per.connection' doesn't map to a List object: 1, a java.lang.Integer
	at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1144)
	at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1109)
	at org.apache.commons.configuration.ConfigurationConverter.getProperties(ConfigurationConverter.java:116)
	at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.$anonfun$new$1(KafkaLineageDispatcher.scala:47)
	at za.co.absa.spline.harvester.dispatcher.kafkadispatcher.SplineRecordSenderFactory.<init>(SplineRecordSenderFactory.scala:34)
	at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.apply(KafkaLineageDispatcher.scala:46)
	at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher$$anonfun$$lessinit$greater$1.apply(KafkaLineageDispatcher.scala:43)
	at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher.<init>(KafkaLineageDispatcher.scala:57)
	at za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher.<init>(KafkaLineageDispatcher.scala:43)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at za.co.absa.commons.HierarchicalObjectFactory$$anonfun$instantiate$3.applyOrElse(HierarchicalObjectFactory.scala:47)
	at za.co.absa.commons.HierarchicalObjectFactory$$anonfun$instantiate$3.applyOrElse(HierarchicalObjectFactory.scala:44)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at scala.util.Failure.recover(Try.scala:234)
	at za.co.absa.commons.HierarchicalObjectFactory.instantiate(HierarchicalObjectFactory.scala:44)
	at za.co.absa.spline.agent.AgentBOM$$anon$1.obtain(AgentBOM.scala:119)
	at za.co.absa.spline.agent.AgentBOM$$anon$1.$anonfun$lineageDispatcher$2(AgentBOM.scala:100)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at za.co.absa.spline.agent.AgentBOM$$anon$1.lineageDispatcher$lzycompute(AgentBOM.scala:100)
	at za.co.absa.spline.agent.AgentBOM$$anon$1.lineageDispatcher(AgentBOM.scala:91)
	at za.co.absa.spline.harvester.SparkLineageInitializer.$anonfun$createListener$8(SparkLineageInitializer.scala:146)
	at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
	at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)

@cerveada
Copy link
Contributor

There could be some issue with the new yaml config. I will check that.

As a workaround you can set the same property using config for spark-submit:

--conf spark.spline.lineageDispatcher=kafka \
--conf spark.spline.lineageDispatcher.kafka.producer.max.in.flight.requests.per.connection=1 \

that should fix the issue for you.

@jinmu0410
Copy link
Author

thanks

@jinmu0410
Copy link
Author

截屏2023-02-23 16 36 48

in producer.model
Is there any introduction to the class inside? There is no comment, and some people don’t understand what it means?

@jinmu0410
Copy link
Author

@cerveada

@cerveada
Copy link
Contributor

All classes in this package represents a lineage that the agent captured. This is an internal model for the agent. It will be later transformed to similar "output" model defined by producer API. As the last step, the "output" model is converted to json and send to server.

@jinmu0410
Copy link
Author

jinmu0410 commented Feb 23, 2023

like this json,"expressions" Can you help me find out what this model means? by the way ,spline-spark-agent Can the parsed lineage be sent to a third-party system? like dataHub?

"expressions":{
        "constants":[
            {
                "id":"expr-0",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-1",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-2",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            },
            {
                "id":"expr-3",
                "dataType":"75fe27b9-9a00-5c7d-966f-33ba32333133",
                "extra":{
                    "simpleClassName":"Literal",
                    "_typeHint":"expr.Literal"
                },
                "value":1
            }
        ]
    }

@jinmu0410
Copy link
Author

https://firststr.com/2021/04/26/spark-compute-lineage-to-datahub/
I found an article, but do we have examples?

@cerveada
Copy link
Contributor

like this json,"expressions" Can you help me find out what this model means?

Maybe this discussion will help #563

Can the parsed lineage be sent to a third-party system? like dataHub?

Yes check the documentation: https://github.com/AbsaOSS/spline-spark-agent#dispatchers

https://firststr.com/2021/04/26/spark-compute-lineage-to-datahub/
I found an article, but do we have examples?

We don't provide any support for external servers other than Spline, but using the dispatchers should allow you to send the data wherever you want, you can always create custom dispatcher if the ones provided are not enough.

@jinmu0410
Copy link
Author

Thank you very much for your reply

@jinmu0410
Copy link
Author

In Kafka, each lineage process receives 2 messages,Is there any concurrency order problem when dealing with exectionPlan and exectionEvent? I mean if i parse “exectionEvent”, but only from “exectionPlan” i can confirm if this spark task is successful?

@jinmu0410
Copy link
Author

I was wrong, it is necessary to judge whether the task is successful by checking whether there is error information in the "exectionEvent" again

@wajda
Copy link
Contributor

wajda commented Feb 24, 2023

it is necessary to judge whether the task is successful by checking whether there is error information in the "exectionEvent"

That's right. Any non-null value in the "error" property means there was an error.

@jinmu0410
Copy link
Author

@cerveada Hi ,i have another question?
截屏2023-03-07 21 16 30

@jinmu0410
Copy link
Author

spark on yarn

@wajda
Copy link
Contributor

wajda commented Mar 7, 2023

What Spark and Yarn version are you using?
We were never able to reproduce this error, but it does happen sometimes to other people. Basically what happens is Spark application shutdown event fires before all the listeners are executed. This seems to be abnormal Spark driver behavior, but more investigation is needed.

I'm closing this issue now, as it turned into a thread where different kinds of issues are discussed.
Please feel free to open a new issue with more details about how to reliably reproduce this issue and we will take a look.

@wajda wajda closed this as completed Mar 7, 2023
@jinmu0410
Copy link
Author

@wajda spark 3.1.2 and yarn 3.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants