Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implicit SeqSchema for anything with a WireFormat #236

Closed
wants to merge 1 commit into from

Conversation

ellchow
Copy link
Contributor

@ellchow ellchow commented Apr 15, 2013

I added an implicit SeqSchema for anything that has a WireFormat. It was a bit of a hassle to persist anything as a sequence file that wasn't one of the standard types. Also, I wanted to use it for checkpointing and writing the avro schema for each checkpoint is a bit prohibitive - of course, this may change when/if the plugin is updated for scala 2.10.

Is there a better way to support this?

@ellchow
Copy link
Contributor Author

ellchow commented Apr 20, 2013

I'm not sure what changed, but this seems to have stopped working since the API for sequence IO was modified.
For this code
import com.nicta.scoobi.Scoobi._

object HelloWorld extends ScoobiApp{
  def run() = {
    val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).toSequenceFile("checkpoint").checkpoint
    val y = x.map(_._2)
    persist(y.toTextFile("second"))
  }
}

[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(Unknown Source)
at scala.Tuple2._2$mcJ$sp(Tuple2.scala:19)
at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:79)
at com.nicta.scoobi.core.DoFn$class.process(EnvDoFn.scala:55)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
at com.nicta.scoobi.core.EnvDoFn$class.processFunction(EnvDoFn.scala:37)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.processFunction(EnvDoFn.scala:77)
at com.nicta.scoobi.impl.plan.comp.ParallelDo.map(ProcessNode.scala:94)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter.map(VectorEmitterWriter.scala:36)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeMapper$1(InputChannel.scala:194)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:182)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:181)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeNext$1(InputChannel.scala:181)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:178)
at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

Also, this fails

import com.nicta.scoobi.Scoobi._

import java.io._
import collection.mutable
import com.nicta.scoobi.core.WireFormat
import org.apache.hadoop.io.BytesWritable

object HelloWorld extends ScoobiApp{

implicit def anyWFSeqSchema[A : WireFormat]: SeqSchema[A] = new SeqSchema[A] {
    type SeqType = BytesWritable

    val b = mutable.ArrayBuffer[Byte]().mapResult(_.toArray)

    def toWritable(a: A) = {
      val bs = new ByteArrayOutputStream
      implicitly[WireFormat[A]].toWire(a, new DataOutputStream(bs))
      new BytesWritable(bs.toByteArray)
    }
    def fromWritable(xs: BytesWritable): A = {
      b.clear()
      xs.getBytes.take(xs.getLength).foreach { x => b += x }
      val bArr = b.result()

      val bais = new ByteArrayInputStream(bArr)
      implicitly[WireFormat[A]].fromWire(new DataInputStream(bais))
    }
    val mf: Manifest[SeqType] = implicitly
  }

  def run() = {
    case class Foo(val value: Int)
    implicit val FooFmt = mkCaseWireFormat(Foo, Foo unapply _)
    val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint").checkpoint
    val y = x.map(e => Foo(e.value + 1))
    persist(y.toTextFile("plusone"))
  }
}

[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
at HelloWorld$$anon$1.fromWritable(HelloWorld.scala:10)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:111)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:110)
at com.nicta.scoobi.core.InputConverter$class.asValue(DataSource.scala:51)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.asValue(SequenceInput.scala:110)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:177)
at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

@etorreborre
Copy link
Collaborator

All of these examples are now working on 0.7.0-SNAPSHOT so I'm closing the pull request.

Note however that the syntax has changed a bit since checkpointing is done like that:

val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint", checkpoint = true)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants