Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnsupportedOperationException: dataType #28

Closed
arijit999 opened this issue Apr 2, 2020 · 5 comments
Closed

UnsupportedOperationException: dataType #28

arijit999 opened this issue Apr 2, 2020 · 5 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@arijit999
Copy link

20/04/02 20:58:29 INFO FileFormatWriter: Finished processing stats for write job 55b3e202-a211-4af3-a5ad-4b8dbbe0a945.
20/04/02 20:58:29 INFO SparkLineageInitializer$: Spline v0.4.2 is initializing...
20/04/02 20:58:29 INFO SparkLineageInitializer$: Spline successfully initialized. Spark Lineage tracking is ENABLED.
20/04/02 20:58:30 WARN ExecutionListenerManager: Error executing query execution listener
java.lang.UnsupportedOperationException: dataType
	at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.dataType(windowExpressions.scala:54)
	at za.co.absa.spline.harvester.converter.ExpressionConverter.getDataType(ExpressionConverter.scala:77)
	at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:71)
	at za.co.absa.spline.harvester.converter.ExpressionConverter$$anonfun$convert$4.apply(ExpressionConverter.scala:72)
	at za.co.absa.spline.harvester.converter.ExpressionConverter$$anonfun$convert$4.apply(ExpressionConverter.scala:72)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:72)
	at za.co.absa.spline.harvester.converter.ExpressionConverter.convert(ExpressionConverter.scala:41)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$1$$anonfun$apply$1.applyOrElse(OperationParamsConverter.scala:42)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$1$$anonfun$apply$1.applyOrElse(OperationParamsConverter.scala:35)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$za$co$absa$spline$harvester$converter$ValueDecomposer$$recursion$1.apply(ValueDecomposer.scala:39)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$za$co$absa$spline$harvester$converter$ValueDecomposer$$recursion$1.apply(ValueDecomposer.scala:39)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$2$$anonfun$apply$1$$anonfun$applyOrElse$4.apply(ValueDecomposer.scala:51)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$2$$anonfun$apply$1.applyOrElse(ValueDecomposer.scala:51)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$2$$anonfun$apply$1.applyOrElse(ValueDecomposer.scala:44)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$1$$anonfun$apply$1.applyOrElse(OperationParamsConverter.scala:35)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$1$$anonfun$apply$1.applyOrElse(OperationParamsConverter.scala:35)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$za$co$absa$spline$harvester$converter$ValueDecomposer$$recursion$1.apply(ValueDecomposer.scala:39)
	at za.co.absa.spline.harvester.converter.ValueDecomposer$$anonfun$za$co$absa$spline$harvester$converter$ValueDecomposer$$recursion$1.apply(ValueDecomposer.scala:39)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$convert$4.apply(OperationParamsConverter.scala:58)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter$$anonfun$convert$4.apply(OperationParamsConverter.scala:54)
	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
	at scala.collection.immutable.Map$Map4.foreach(Map.scala:188)
	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
	at za.co.absa.spline.harvester.converter.OperationParamsConverter.convert(OperationParamsConverter.scala:54)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:34)
	at za.co.absa.spline.harvester.builder.GenericNodeBuilder.build(GenericNodeBuilder.scala:24)
	at za.co.absa.spline.harvester.LineageHarvester$$anonfun$harvest$1$$anonfun$4.apply(LineageHarvester.scala:76)
	at za.co.absa.spline.harvester.LineageHarvester$$anonfun$harvest$1$$anonfun$4.apply(LineageHarvester.scala:76)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at za.co.absa.spline.harvester.LineageHarvester$$anonfun$harvest$1.apply(LineageHarvester.scala:76)
	at za.co.absa.spline.harvester.LineageHarvester$$anonfun$harvest$1.apply(LineageHarvester.scala:69)
	at scala.Option.flatMap(Option.scala:171)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:69)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:41)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$onSuccess$1.apply(SplineQueryExecutionListener.scala:37)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$onSuccess$1.apply(SplineQueryExecutionListener.scala:37)
	at scala.Option.foreach(Option.scala:257)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:37)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
	at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156)
	at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:678)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
20/04/02 20:58:30 INFO SparkContext: Invoking stop() from shutdown hook
@wajda
Copy link
Contributor

wajda commented Apr 2, 2020

Could you please provide a Spark app snippet that reproduces the issue?

@wajda
Copy link
Contributor

wajda commented Apr 2, 2020

What Spark version are you using?

@arijit999
Copy link
Author

arijit999 commented Apr 3, 2020

Spark Version is 2.4.4

PySpark Code Snippet

#import statements

def rm_detl(
    cgm_dtl: DataFrame, op: DataFrame, rm_batch_qr7_disc: DataFrame, RPT_MNTH: Date
) -> DataFrame:

    cgm_dtl = cgm_dtl.select(
        fx.col("PGM_ID"),
        fx.col("UPDATED_ON"),
        fx.col("CREATED_ON"),
        fx.col("STAT_RSN_CODE"),
        fx.col("STAT_CODE"),
        fx.col("BEG_DATE"),
        fx.col("END_DATE"),
        fx.col("CREATED_BY"),
    ).alias("cgm_dtl")

    op = op.select(
        fx.col("PID"), fx.col("SERIAL_NUM_IDENTIF"), fx.col("CASE_NAME")
    ).alias("op")

    rm_batch_qr7_disc = rm_batch_qr7_disc.select(
        fx.col("PID"), fx.col("STAFF_ID")
    ).alias("rmbqr7")

    pdh_qr7_join = (
        rm_batch_qr7_disc.join(
            cgm_dtl, fx.col("rmbqr7.PID") == fx.col("cgm_dtl.PGM_ID")
        )
        .select(fx.col("STAFF_ID"))
        .alias("pdh_qr7_join")
    )

    qry_pgmdtl_op_1 = (
        (cgm_dtl.join(op, fx.col("cgm_dtl.PGM_ID") == fx.col("op.PID")))
        .where(
            (
                fx.to_date(fx.col("cgm_dtl.BEG_DATE"))
                <= fx.add_months(fx.to_date(fx.lit(RPT_MNTH)), -1)
            )
            & (
                fx.to_date(fx.col("cgm_dtl.END_DATE"))
                > fx.add_months(fx.to_date(fx.lit(RPT_MNTH)), -1)
            )
            & (
                fx.to_date(fx.col("cgm_dtl.CREATED_ON"))
                < fx.to_date(fx.lit(RPT_MNTH))
            )
        )
        .select(
            fx.col("cgm_dtl.PGM_ID"),
            fx.col("cgm_dtl.UPDATED_ON"),
            fx.col("cgm_dtl.CREATED_ON"),
            fx.col("cgm_dtl.STAT_RSN_CODE"),
            fx.col("cgm_dtl.STAT_CODE"),
            fx.col("cgm_dtl.BEG_DATE"),
            fx.col("cgm_dtl.END_DATE"),
            fx.col("cgm_dtl.CREATED_BY"),
            fx.col("op.PID"),
            fx.col("op.SERIAL_NUM_IDENTIF"),
            fx.col("op.CASE_NAME"),
        )
        .alias("qry_pgmdtl_op_1")
    )

    qry_pgmdtl_op_2 = (
        cgm_dtl.join(op, fx.col("cgm_dtl.PGM_ID") == fx.col("op.PID"))
        .join(
            pdh_qr7_join,
            fx.col("cgm_dtl.CREATED_BY") == fx.col("pdh_qr7_join.STAFF_ID"),
        )
        .where(
            (
                (fx.col("cgm_dtl.BEG_DATE") == fx.to_date(fx.lit(RPT_MNTH)))
                & (
                    fx.trunc(fx.col("cgm_dtl.CREATED_ON"), "mon")
                    == fx.trunc(fx.to_date(fx.lit(RPT_MNTH)), "mon")
                )
                & (fx.col("cgm_dtl.STAT_CODE") == fx.lit("DS"))
                & (
                    fx.col("cgm_dtl.STAT_RSN_CODE").isin(
                        ["01", "02", "03", "SC", "SD", "SB"]
                    )
                )
            )
        )
        .select(
            fx.col("cgm_dtl.PGM_ID"),
            fx.col("cgm_dtl.UPDATED_ON"),
            fx.col("cgm_dtl.CREATED_ON"),
            fx.col("cgm_dtl.STAT_RSN_CODE"),
            fx.col("cgm_dtl.STAT_CODE"),
            fx.col("cgm_dtl.BEG_DATE"),
            fx.col("cgm_dtl.END_DATE"),
            fx.col("cgm_dtl.CREATED_BY"),
            fx.col("op.PID"),
            fx.col("op.SERIAL_NUM_IDENTIF"),
            fx.col("op.CASE_NAME"),
        )
        .alias("qry_pgmdtl_op_2")
    )

    uni_pgmdtl_op_1_and_2 = qry_pgmdtl_op_1.union(qry_pgmdtl_op_2).alias(
        "uni_pgmdtl_op_1_and_2"
    )

    w_mx_beg_dt_prtn_on_pid = Window.partitionBy(uni_pgmdtl_op_1_and_2.PID)
    a = (
        uni_pgmdtl_op_1_and_2.withColumn(
            "MX_BEG_DATE",
            fx.max(fx.col("uni_pgmdtl_op_1_and_2.BEG_DATE")).over(
                w_mx_beg_dt_prtn_on_pid
            ),
        )
        .distinct()
        .alias("a")
    )

    w_mx_updt_dt_prtn_on_pid = Window.partitionBy(a.PID)
    b = (
        a.where(fx.col("a.BEG_DATE") == fx.col("a.MX_BEG_DATE"))
        .withColumn(
            "MX_UPDATED", fx.max(fx.col("a.UPDATED_ON")).over(w_mx_updt_dt_prtn_on_pid)
        )
        .alias("b")
    )

    rm_detl_df = (
        b.where(fx.col("b.UPDATED_ON") == fx.col("b.MX_UPDATED")).select(
            fx.col("b.SERIAL_NUM_IDENTIF").alias("SERIAL_NUM_IDENTIF"),
            fx.col("b.CASE_NAME").alias("CASE_NAME"),
            fx.lit(None).alias("TS").cast(TimestampType()).alias("TS"),
            fx.col("b.PID").alias("PID"),
            fx.col("b.UPDATED_ON").alias("UPDATED_ON"),
            fx.date_trunc("day", fx.col("b.CREATED_ON")).alias("CREATED_ON"),
            fx.col("b.STAT_RSN_CODE").alias("STAT_RSN_CODE"),
            fx.col("b.STAT_CODE").alias("STAT_CODE"),
            fx.col("b.BEG_DATE").alias("BEG_DATE"),
            fx.col("b.END_DATE").alias("END_DATE"),
            fx.col("b.CREATED_BY").alias("CREATED_BY"),
            fx.col("b.MX_BEG_DATE").alias("MX_BEG_DATE"),
            fx.col("b.MX_UPDATED").alias("MX_UPDATED"),
        )
    ).alias("rm_detl_df")

    return rm_detl_df


class KmDtl(SparkJob):
    def __init__(
        self, Date:str, database=dl.ZZZ, bucket=dl.DEFAULT_BUCKET
    ):
        super().__init__(database=database, bucket=bucket, Date=Date)
        self.database = database
        self.bucket = bucket
        self.RPT_MNTH = datetime.date(datetime.strptime(Date, "%Y-%m-%d"))

    def load_all_data(self):
        self.CASE = dl.load_from_datalake(self.database, dl.CASE, bucket=self.bucket)
        self.CGM = dl.load_from_datalake(self.database, dl.CGM, bucket=self.bucket)
        self.CGM_DTL = dl.load_from_datalake(self.database, dl.CGM_DTL, bucket=self.bucket)
        self.STAFF = dl.load_from_datalake(self.database, dl.STAFF, bucket=self.bucket)
        self.OP = open_pgms(
            CASE=self.CASE,
            CGM=self.CGM,
            CGM_DTL=self.CGM_DTL,
            RPT_MNTH=self.RPT_MNTH,
        )
        self.RM_BATCH_QR7_DISC = rm_batch_qr7_disc(
            STAFF=self.STAFF, CGM_DTL=self.CGM_DTL, RPT_MNTH=self.RPT_MNTH
        )

    def run(self):
        spark = SparkSession.builder.appName("KmDtl").getOrCreate()

        self.load_all_data()
        rm_detl_df = rm_detl(
            cgm_dtl=self.CGM_DTL,
            op=self.OP,
            rm_batch_qr7_disc=self.RM_BATCH_QR7_DISC,
            RPT_MNTH=self.RPT_MNTH,
        )

        dl.save_to_datalake(rm_detl_df, dl.RR2255CW, dl.RM_DETL, bucket=self.bucket)

@cerveada
Copy link
Contributor

  • WindowSpecDefinition.dataType always throws.

  • This expression is most probably created when using column.over(...) function.

  • Since WindowSpecDefinition extends Unevaluable that doesn't suppose to live past analysis or optimization, the solution may be to ignore it.

@cerveada cerveada self-assigned this Apr 30, 2020
@cerveada
Copy link
Contributor

cerveada commented May 5, 2020

We will release fix for 0.5.x that will allow agent to gather the data properly.

Attribute lineage feature will not take into account the information from windowExpressions - this will be fixed in spline 0.6 as AbsaOSS/spline#668

@cerveada cerveada closed this as completed May 5, 2020
@wajda wajda transferred this issue from AbsaOSS/spline May 5, 2020
@wajda wajda added the bug Something isn't working label May 5, 2020
@wajda wajda added this to the 0.5.1 milestone May 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

No branches or pull requests

3 participants