Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DateType() conversion from PySpark fails #476

Closed
daStrauss opened this issue Feb 14, 2017 · 5 comments
Closed

DateType() conversion from PySpark fails #476

daStrauss opened this issue Feb 14, 2017 · 5 comments
Assignees
Labels

Comments

@daStrauss
Copy link

I'm encountering problems returning a DataFrame with DateType() columns from an SMV module specifically while running within py-smv.

I get the following stack trace:

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost task 10.3 in stage 1.0 (TID 17, <hostname>): java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
        at org.tresamigos.smv.DateTypeFormat.valToStr(SmvSchema.scala:132)
        at org.tresamigos.smv.SmvSchema$$anonfun$rowToCsvString$1.apply(SmvSchema.scala:383)
        at org.tresamigos.smv.SmvSchema$$anonfun$rowToCsvString$1.apply(SmvSchema.scala:380)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.tresamigos.smv.SmvSchema.rowToCsvString(SmvSchema.scala:380)
        at org.tresamigos.smv.FileIOHandler$$anonfun$11.apply(FileIOHandler.scala:134)
        at org.tresamigos.smv.FileIOHandler$$anonfun$11.apply(FileIOHandler.scala:134)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1430)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1409)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1409)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1409)
        at org.tresamigos.smv.FileIOHandler.saveAsCsvWithSchema(FileIOHandler.scala:144)
        at org.tresamigos.smv.SmvUtil$.persist(SmvUtil.scala:53)
        at org.tresamigos.smv.SmvDataSet.persist(SmvDataSet.scala:192)
        at org.tresamigos.smv.SmvDataSet$$anonfun$computeRDD$1.applyOrElse(SmvDataSet.scala:208)
        at org.tresamigos.smv.SmvDataSet$$anonfun$computeRDD$1.applyOrElse(SmvDataSet.scala:206)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
        at scala.util.Failure.recoverWith(Try.scala:172)
        at org.tresamigos.smv.SmvDataSet.computeRDD(SmvDataSet.scala:206)
        at org.tresamigos.smv.SmvDataSet.rdd(SmvDataSet.scala:139)
        at org.tresamigos.smv.SmvApp.resolveRDD(SmvApp.scala:188)
        at org.tresamigos.smv.SmvApp.runModule(SmvApp.scala:417)
        at org.tresamigos.smv.SmvApp$$anonfun$generateOutputModules$1.apply(SmvApp.scala:383)
        at org.tresamigos.smv.SmvApp$$anonfun$generateOutputModules$1.apply(SmvApp.scala:382)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.tresamigos.smv.SmvApp.generateOutputModules(SmvApp.scala:382)
        at org.tresamigos.smv.SmvApp.run(SmvApp.scala:609)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
        at org.tresamigos.smv.DateTypeFormat.valToStr(SmvSchema.scala:132)
        at org.tresamigos.smv.SmvSchema$$anonfun$rowToCsvString$1.apply(SmvSchema.scala:383)
        at org.tresamigos.smv.SmvSchema$$anonfun$rowToCsvString$1.apply(SmvSchema.scala:380)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.tresamigos.smv.SmvSchema.rowToCsvString(SmvSchema.scala:380)
        at org.tresamigos.smv.FileIOHandler$$anonfun$11.apply(FileIOHandler.scala:134)
        at org.tresamigos.smv.FileIOHandler$$anonfun$11.apply(FileIOHandler.scala:134)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more

This is called by calling a module that looks like this:

back_to_date = F.udf(lambda eff_date: datetime.datetime.strptime(eff_date, "%m%d%Y"), DateType())


class WithDateTimes(smv.SmvPyModule, smv.SmvPyOutput):

    def requiresDS(self):
        return [inputdata.RawInput]

    def run(self, i):
        df = i[inputdata.RawInput]

        add_date = df\
            .withColumn('true_start_date', 
                back_to_date(F.col('effective_start_date').cast('string')))\
            .withColumn('true_stop_date', 
                back_to_date(F.col('effective_stop_date').cast('string')))\
            .drop('effective_start_date')\
            .drop('effective_stop_date')
        return add_date  

When I simply cast the true_stop_date and true_start_date columns to strings, there are no more errors.

@AliTajeldin
Copy link
Contributor

@laneb : please take a look at this issue and see if we can reproduce it.

@AliTajeldin
Copy link
Contributor

From @ninjapapa :
DateType is not supported yet. Workaround is to use TimeStampType.
Should be able to fix pretty easily though.

@ninjapapa ninjapapa self-assigned this Feb 25, 2017
@ninjapapa
Copy link
Contributor

ninjapapa commented Feb 25, 2017

  • SmvSchema
  • Edd
  • ColumnHelper
  • SchemaDiscovery

ninjapapa added a commit that referenced this issue Feb 25, 2017
@ninjapapa
Copy link
Contributor

Actually DateType was already added to SmvSchema a year ago. However there were no test case created for the new feature, so that when Spark changed it's internal representation of Date type, although we fixed the string-> value part, we forgot to fix value-> string part, which caused the issue.

@ninjapapa
Copy link
Contributor

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants