Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka source type support #331

Closed
gatchan00 opened this issue Aug 30, 2019 · 6 comments
Closed

Kafka source type support #331

gatchan00 opened this issue Aug 30, 2019 · 6 comments
Assignees
Labels
Milestone

Comments

@gatchan00
Copy link

gatchan00 commented Aug 30, 2019

Trying to store the result on a kafka topic is not working.

You can try to reproduce the issue using the github project https://github.com/gatchan00/spline-test , launching the BasicExampleKafka.

Process ends with no error, but when trying to use the WebUI it seems to be loading and not finishing. It's due to a problem when retrieving data (tries to retrieve the path field from rootOperation). Error is attached as ATTACHMENT1 at the end of the issue

Trying to solve this manually, adding the path field solve the infinite load time screen, but instead not working. You click on the element and did not take you to any other screen. Error shown on the GUI is shown at ATTACHMENT2.


ATTACHMENT1

13:32:33.947 [http-bio-8080-exec-5] ERROR za.co.absa.spline.web.logging.ErrorCode$ - ErrorCode(b7c64d21-875a-4b81-9dba-335d580656f9)
java.lang.RuntimeException: class za.co.absa.spline.model.PersistedDatasetDescriptor requires value for 'path'
        at scala.sys.package$.error(package.scala:27)
        at salat.DefaultArg.safeValue$lzycompute(Grater.scala:484)
        at salat.DefaultArg.safeValue(Grater.scala:480)
        at salat.ConcreteGrater.safeDefault(Grater.scala:435)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at scala.Option.orElse(Option.scala:289)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:307)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at salat.ConcreteGrater.asObject(Grater.scala:307)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:80)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:79)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
        at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.common.ARM$.using(ARM.scala:31)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson.asJsonArrayInto(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Success.map(Try.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
ago 30, 2019 1:32:34 PM org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver doResolveHandlerMethodException
ADVERTENCIA: Failed to invoke @ExceptionHandler method: public org.springframework.http.ResponseEntity<java.lang.String> za.co.absa.spline.web.rest.controller.RESTErrorControllerAdvice.handle_500(java.lang.Throwable)
java.lang.IllegalStateException: getWriter() has already been called for this response
        at org.apache.catalina.connector.Response.getOutputStream(Response.java:605)
        at org.apache.catalina.connector.ResponseFacade.getOutputStream(ResponseFacade.java:197)
        at org.springframework.http.server.ServletServerHttpResponse.getBody(ServletServerHttpResponse.java:84)
        at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:103)
        at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:43)
        at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:230)
        at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:271)
        at org.springframework.web.servlet.mvc.method.annotation.HttpEntityMethodProcessor.handleReturnValue(HttpEntityMethodProcessor.java:218)
        at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:119)
        at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403)
        at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61)
        at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:140)
        at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:78)
        at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1255)
        at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1062)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1008)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:978)
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:870)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:621)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:855)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:728)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:305)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210)
        at org.apache.catalina.core.ApplicationDispatcher.invoke(ApplicationDispatcher.java:749)
        at org.apache.catalina.core.ApplicationDispatcher.doDispatch(ApplicationDispatcher.java:660)
        at org.apache.catalina.core.ApplicationDispatcher.dispatch(ApplicationDispatcher.java:626)
        at org.apache.catalina.core.AsyncContextImpl$1.run(AsyncContextImpl.java:211)
        at org.apache.catalina.core.AsyncContextImpl.doInternalDispatch(AsyncContextImpl.java:350)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:217)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:123)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:472)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:99)
        at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:936)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:118)
        at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:298)
        at org.apache.coyote.http11.AbstractHttp11Processor.asyncDispatch(AbstractHttp11Processor.java:1562)
        at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:583)
        at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:312)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

ago 30, 2019 1:32:34 PM org.apache.catalina.core.ApplicationDispatcher invoke
GRAVE: Servlet.service() for servlet rest_dispatcher threw exception
java.lang.RuntimeException: class za.co.absa.spline.model.PersistedDatasetDescriptor requires value for 'path'
        at scala.sys.package$.error(package.scala:27)
        at salat.DefaultArg.safeValue$lzycompute(Grater.scala:484)
        at salat.DefaultArg.safeValue(Grater.scala:480)
        at salat.ConcreteGrater.safeDefault(Grater.scala:435)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at scala.Option.orElse(Option.scala:289)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:307)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at salat.ConcreteGrater.asObject(Grater.scala:307)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:80)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:79)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
        at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.common.ARM$.using(ARM.scala:31)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson.asJsonArrayInto(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Success.map(Try.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

ago 30, 2019 1:32:34 PM org.apache.catalina.core.StandardWrapperValve invoke
GRAVE: Servlet.service() for servlet [rest_dispatcher] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: class za.co.absa.spline.model.PersistedDatasetDescriptor requires value for 'path'] with root cause
java.lang.RuntimeException: class za.co.absa.spline.model.PersistedDatasetDescriptor requires value for 'path'
        at scala.sys.package$.error(package.scala:27)
        at salat.DefaultArg.safeValue$lzycompute(Grater.scala:484)
        at salat.DefaultArg.safeValue(Grater.scala:480)
        at salat.ConcreteGrater.safeDefault(Grater.scala:435)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7$$anonfun$apply$3.apply(Grater.scala:322)
        at scala.Option.orElse(Option.scala:289)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:322)
        at salat.ConcreteGrater$$anonfun$7.apply(Grater.scala:307)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at salat.ConcreteGrater.asObject(Grater.scala:307)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:80)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$findDatasets$1$$anonfun$apply$5.apply(MongoDataLineageReader.scala:79)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
        at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson$$anonfun$asJsonArrayInto$2.apply(StringJSONConverters.scala:89)
        at za.co.absa.spline.common.ARM$.using(ARM.scala:31)
        at za.co.absa.spline.web.json.StringJSONConverters$CloseableIterableToJson.asJsonArrayInto(StringJSONConverters.scala:89)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at za.co.absa.spline.web.rest.controller.LineageController$$anonfun$datasetDescriptors$1.apply(LineageController.scala:63)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Success.map(Try.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


ATTACHMENT2

13:43:52.296 [http-bio-8080-exec-9] ERROR za.co.absa.spline.web.logging.ErrorCode$ - ErrorCode(91c18bf2-529e-4ea0-9845-5c74c27cfbfb)
java.util.concurrent.ExecutionException: Boxed Error
        at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
        at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: salat.util.ToObjectGlitch:

  null

  $anon$3(class za.co.absa.spline.model.DataLineage @ za.co.absa.spline.persistence.mongo.serialization.BSONSalatContext$$anon$1@4b57cd40) toObject failed on:
  SYM: za.co.absa.spline.model.DataLineage
  CONSTRUCTOR
public za.co.absa.spline.model.DataLineage(java.lang.String,java.lang.String,long,java.lang.String,scala.collection.Seq<za.co.absa.spline.model.op.Operation>,scala.collection.Seq<za.co.absa.spline.model.MetaDataset>,scala.collection.Seq<za.co.absa.spline.model.Attribute>,scala.collection.Seq<za.co.absa.spline.model.dt.DataType>,boolean)

---------- CONSTRUCTOR EXPECTS FOR PARAM [0] --------------
NAME:         appId
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
local-1567078483050
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [1] --------------
NAME:         appName
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
DataLineageWrapperKafka
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [2] --------------
NAME:         timestamp
TYPE:         long
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.Long
VALUE:
1567078490761
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [3] --------------
NAME:         sparkVer
TYPE:         java.lang.String
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.String
VALUE:
2.3.3
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [4] --------------
NAME:         operations
TYPE:         scala.collection.Seq<za.co.absa.spline.model.op.Operation>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.Nil$[scala.runtime.Nothing$]
VALUE:
List()
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [5] --------------
NAME:         datasets
TYPE:         scala.collection.Seq<za.co.absa.spline.model.MetaDataset>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.$colon$colon[B]
VALUE:
List(MetaDataset(6756c825-8999-46df-b28a-f467078476a9,Schema(List())))
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [6] --------------
NAME:         attributes
TYPE:         scala.collection.Seq<za.co.absa.spline.model.Attribute>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.Nil$[scala.runtime.Nothing$]
VALUE:
List()
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [7] --------------
NAME:         dataTypes
TYPE:         scala.collection.Seq<za.co.absa.spline.model.dt.DataType>
DEFAULT ARG   [Missing, but unnecessary because input value was supplied]
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: scala.collection.immutable.Nil$[scala.runtime.Nothing$]
VALUE:
List()
------------------------------------------------------------


---------- CONSTRUCTOR EXPECTS FOR PARAM [8] --------------
NAME:         writeIgnored
TYPE:         boolean
DEFAULT ARG   false
@Ignore       false
---------- CONSTRUCTOR INPUT ------------------------
TYPE: java.lang.Boolean
VALUE:
false
------------------------------------------------------------



        at salat.ConcreteGrater.feedArgsToConstructor(Grater.scala:359)
        at salat.ConcreteGrater.asObject(Grater.scala:326)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1$$anonfun$apply$1.apply(MongoDataLineageReader.scala:41)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1$$anonfun$apply$1.apply(MongoDataLineageReader.scala:41)
        at scala.Option.map(Option.scala:146)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1.apply(MongoDataLineageReader.scala:41)
        at za.co.absa.spline.persistence.mongo.MongoDataLineageReader$$anonfun$loadByDatasetId$1.apply(MongoDataLineageReader.scala:41)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Success.map(Try.scala:237)
        ... 8 common frames omitted
Caused by: java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at salat.ConcreteGrater.feedArgsToConstructor(Grater.scala:350)
        ... 17 common frames omitted
Caused by: java.lang.IllegalArgumentException: requirement failed: list of operations cannot be empty
        at scala.Predef$.require(Predef.scala:224)
        at za.co.absa.spline.model.DataLineage.<init>(DataLineage.scala:48)
        ... 22 common frames omitted
@wajda wajda added the bug label Aug 30, 2019
@wajda
Copy link
Contributor

wajda commented Sep 24, 2019

Structured Streaming is not supported by Spline yet. But it's starting getting a priority, so will start working on it soon.

@wajda
Copy link
Contributor

wajda commented Sep 24, 2019

Please follow #16 for further updates.

@wajda wajda closed this as completed Sep 24, 2019
@gatchan00
Copy link
Author

This is not streaming, its a batch process storing to kafka topic

@gatchan00
Copy link
Author

As you can see, this is a normal batch process (you can check the source code at https://github.com/gatchan00/spline-test)

    val lectura1: DataFrame = spark
      .read.
      format("csv")
      .option("header", "true")
      .load("algo.csv")

    val lectura2: DataFrame = spark
      .read.
      format("csv")
      .option("header", "true")
      .load("compras.csv")

    //val lecturaMetadatada = lectura.addMetaData(spark)


    val mezcla = lectura1.join(lectura2, lectura1("id") === lectura2("customerid"))

    val mezcla2 = mezcla.withColumn("Nueva",'age  * 3)
    mezcla2.selectExpr("CAST (Nueva as STRING) as value")
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "topic_spline")
      .save()

  }

@wajda
Copy link
Contributor

wajda commented Sep 24, 2019

Ouch, sorry, I overlooked it. Thanks for heads up.

@wajda wajda reopened this Sep 24, 2019
@wajda
Copy link
Contributor

wajda commented Sep 24, 2019

Ok, I confirm the issue. The fix will be incorporated into 0.4 release

@wajda wajda removed the duplicate label Sep 24, 2019
@wajda wajda changed the title Spark batch not working when storing on Kafka Kafka source type support Sep 24, 2019
@wajda wajda added this to the 0.4.next milestone Sep 24, 2019
@wajda wajda self-assigned this Sep 24, 2019
wajda added a commit that referenced this issue Sep 25, 2019
wajda added a commit that referenced this issue Sep 26, 2019
@wajda wajda closed this as completed Nov 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

No branches or pull requests

2 participants