Skip to content

Commit

Permalink
Merge pull request #66 from foursquare/scalding2
Browse files Browse the repository at this point in the history
Scalding2
  • Loading branch information
rahulpratapm committed Jun 18, 2015
2 parents 1535925 + 312077d commit 1f10c6f
Show file tree
Hide file tree
Showing 65 changed files with 4,381 additions and 31 deletions.
6 changes: 6 additions & 0 deletions conf/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
6 changes: 6 additions & 0 deletions conf/hdfs-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
6 changes: 6 additions & 0 deletions conf/mapred-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>
3 changes: 0 additions & 3 deletions indexer/src/main/scala/GeocodeRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,10 @@ case class GeocodeRecord(
hasPoly: Boolean = false,
var attributes: Option[Array[Byte]] = None,
extraRelations: List[Long] = Nil,
var loc: GeoJsonPoint = GeoJsonPoint.NilPoint,
polyId: ObjectId = GeocodeRecord.dummyOid,
ids: List[Long] = Nil,
polygonSource: Option[String] = None
) extends Ordered[GeocodeRecord] {
// gross that we overwrite this
loc = GeoJsonPoint(lat, lng)

val factory = new TCompactProtocol.Factory()
val serializer = new TSerializer(factory)
Expand Down
10 changes: 8 additions & 2 deletions indexer/src/main/scala/S2CoveringAkkaWorkers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class S2CoveringWorker extends Actor with DurationUtils with RevGeoConstants wit
}

logDuration("coverClippingForRevGeoIndex", "clipped and outputted cover for %d cells (%s) for revgeo index".format(cells.size, polyId)) {
val recordShape = geom.buffer(0)
val preparedRecordShape = PreparedGeometryFactory.prepare(recordShape)
val records = cells.map((cellid: S2CellId) => {
if (geom.isInstanceOf[JTSPoint]) {
RevGeoIndex(
Expand All @@ -99,11 +101,15 @@ class S2CoveringWorker extends Actor with DurationUtils with RevGeoConstants wit
geom = Some(wkbWriter.write(geom))
)
} else {
val recordShape = geom.buffer(0)
val preparedRecordShape = PreparedGeometryFactory.prepare(recordShape)
val s2shape = ShapefileS2Util.fullGeometryForCell(cellid)
if (preparedRecordShape.contains(s2shape)) {
RevGeoIndex(cellid.id(), polyId, full = true, geom = None)
} else if (preparedRecordShape.within(s2shape)) {
RevGeoIndex(
cellid.id(), polyId,
full = false,
geom = Some(wkbWriter.write(geom))
)
} else {
val intersection = s2shape.intersection(recordShape)
val geomToIndex = if (intersection.getGeometryType == "GeometryCollection") {
Expand Down
83 changes: 83 additions & 0 deletions indexer/src/main/scala/ScalaReflection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2012 Foursquare Labs Inc. All Rights Reserved.

package com.foursquare.common.reflection

import java.lang.reflect.{Field, Method}

/** Utilities for performing reflection on Scala classes.
*/
object ScalaReflection {
/** Given a name, return the instance of the singleton object for
* that name.
*/
def objectFromName(name: String, classLoaderOpt: Option[ClassLoader] = None): AnyRef = {
val clazz = classLoaderOpt match {
case None => Class.forName(name + "$")
case Some(loader) => Class.forName(name + "$", true, loader)
}
val moduleGetter = clazz.getDeclaredField("MODULE$")
moduleGetter.get()
}

/** Get all the superclasses of a given Class
*/
def getAncestors(clazz: Class[_]): Seq[Class[_]] = clazz match {
case null => Vector.empty
case c => c +: getAncestors(c.getSuperclass)
}

/** Get all the methods for a given Class
*/
def getAllMethods(clazz: Class[_]): Seq[Method] = {
getAncestors(clazz).flatMap(_.getDeclaredMethods)
}

def getAllFields(clazz: Class[_]): Seq[Field] = {
getAncestors(clazz).flatMap(_.getDeclaredFields)
}

/** Call a private method on any instance with the given arguments
*/
def privateMethodCall(instance: AnyRef)(methodName: String)(_args: Any*): Any = {
val args = _args.map(_.asInstanceOf[AnyRef])
val clazz = instance.getClass
val methods = getAllMethods(clazz)
val fields = getAllFields(clazz)
val methodOpt = methods.find(_.getName == methodName)
val fieldOpt = fields.find(_.getName == methodName)
val hiddenFieldOpt = fields.find(_.getName.endsWith("$$" + methodName))

methodOpt.orElse(fieldOpt).orElse(hiddenFieldOpt) match {
case Some(method: Method) =>
method.setAccessible(true)
method.invoke(instance, args: _*)
case Some(field: Field) if args.isEmpty =>
field.setAccessible(true)
field.get(instance)
case None =>
val errorMsg = "Method %s valid for arguments %s not found"
throw new IllegalArgumentException(errorMsg.format(methodName, args.mkString("(", ", ", ")")))
}
}

/** Check to see if A is instance of B by checking classOf[A] against classOf[B]. Example:
*
* trait Foo
* abstract class Bar extends Foo
* case class Baz(i: Int) extends Bar
* isClassInstanceOf[Bar](classOf[Foo]) == false
* isClassInstanceOf[Foo](classOf[Bar]) == true
*/
def isClassInstanceOf[T: Manifest](clazz: Class[_]): Boolean = {
val m = manifest[T]
val instanceOfClass = m.runtimeClass
if (clazz == instanceOfClass) true
else if (clazz.getInterfaces.contains(instanceOfClass)) true
else {
val superClass = clazz.getSuperclass
if (superClass == null) false
else isClassInstanceOf[T](superClass)
}
}

}
192 changes: 192 additions & 0 deletions indexer/src/main/scala/SpindleSequenceFile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2014 Foursquare Labs Inc. All Rights Reserved.

package com.foursquare.hadoop.cascading.io

import cascading.flow.FlowProcess
import cascading.scheme.{Scheme, SinkCall, SourceCall}
import cascading.tap.Tap
import cascading.tuple.Fields
import com.foursquare.common.thrift.ThriftConverter
import org.apache.hadoop.io.{BytesWritable, NullWritable, Writable}
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader, SequenceFileInputFormat,
SequenceFileOutputFormat}
import scala.reflect.runtime.universe._

/**
* Basic implementation of [[cascading.scheme.Scheme]] that can read and write Hadoop sequence files containing
* Spindle records. The implementation merges aspects of [[cascading.scheme.hadoop.SequenceFile]] and
* [[cascading.scheme.hadoop.WritableSequenceFile]].
*
* Limitations:
* - The Thrift record is stored as a single field in the [[cascading.tuple.Tuple]] by the deserializer. The
* serializer expects a single field in the outgoing [[cascading.tuple.Tuple]] of the expected record type.
*/
class SpindleSequenceFile[K <: Writable: Manifest, T <: ThriftConverter.TType](
val classOfK: Class[K],
val classOfT: Class[T])
extends Scheme[
JobConf,
RecordReader[K, BytesWritable],
OutputCollector[K, BytesWritable],
SpindleSequenceFile.Context[T],
SpindleSequenceFile.Context[T]
](
new Fields(0, 1),
new Fields(0, 1)
) {

private def makeContext(): SpindleSequenceFile.Context[T] = {
SpindleSequenceFile.Context(new ThriftConverter(classOfT))
}

//
// Input Methods (read from a sequence file)
//

@throws[java.io.IOException]
override def sourceConfInit(
flowProcess: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[K, BytesWritable], OutputCollector[K, BytesWritable]],
conf: JobConf): Unit = {
conf.setInputFormat(classOf[SequenceFileInputFormat[K, BytesWritable]])
}

override def sourcePrepare(
flowProcess: FlowProcess[JobConf],
sourceCall: SourceCall[SpindleSequenceFile.Context[T], RecordReader[K, BytesWritable]]): Unit = {
sourceCall.setContext(makeContext())
}

@throws[java.io.IOException]
override def source(
flowProcess: FlowProcess[JobConf],
sourceCall: SourceCall[SpindleSequenceFile.Context[T], RecordReader[K, BytesWritable]]): Boolean = {

val key = manifest[K].runtimeClass.newInstance.asInstanceOf[K]
val valueAsBytes = new BytesWritable()

val result = sourceCall.getInput.next(key, valueAsBytes)

if (result) {
val context = sourceCall.getContext
val record = context.thriftConverter.deserialize(valueAsBytes.getBytes)

val tupleEntry = sourceCall.getIncomingEntry
tupleEntry.setObject(0, key)
tupleEntry.setObject(1, record)

true
} else {
false
}
}

override def sourceCleanup(
flowProcess: FlowProcess[JobConf],
sourceCall: SourceCall[SpindleSequenceFile.Context[T], RecordReader[K, BytesWritable]]): Unit = {
sourceCall.setContext(null)
}


//
// Output Methods (write to a sequence file)
//

@throws[java.io.IOException]
override def sinkConfInit(
flowProcess: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[K, BytesWritable], OutputCollector[K, BytesWritable]],
conf: JobConf): Unit = {
conf.setOutputKeyClass(classOfK)
conf.setOutputValueClass(classOf[BytesWritable])
conf.setOutputFormat(classOf[SequenceFileOutputFormat[K, BytesWritable]])
}

override def sinkPrepare(
flowProcess: FlowProcess[JobConf],
sinkCall: SinkCall[SpindleSequenceFile.Context[T], OutputCollector[K, BytesWritable]]): Unit = {
sinkCall.setContext(makeContext())
}

@throws[java.io.IOException]
override def sink(
flowProcess: FlowProcess[JobConf],
sinkCall: SinkCall[SpindleSequenceFile.Context[T], OutputCollector[K, BytesWritable]]): Unit = {
val tupleEntry = sinkCall.getOutgoingEntry

// we should do the fancy "if nullwritable is the K, th"
val isNullKey = classOfK == classOf[NullWritable]
val keyValue = if (isNullKey) {
NullWritable.get.asInstanceOf[K]
} else {
tupleEntry.getObject(0).asInstanceOf[K]
}

val valueAsSpindle = {
if (isNullKey && tupleEntry.size() != 1) {
throw new IllegalArgumentException("Tuples to sequence files must contain only the record to output if NullWritable is the keytype.")
}

//println("SIZE: " + tupleEntry.size)
//println("ENTRY: " + tupleEntry)
if (!isNullKey && tupleEntry.size() != 2) {
throw new IllegalArgumentException("Tuples to sequence files must contain a key and a record.")
}

val valueAsAnyRef: AnyRef = if (isNullKey) {
tupleEntry.getObject(0)
} else {
tupleEntry.getObject(1)
}
if (classOfT.isAssignableFrom(valueAsAnyRef.getClass)) {
valueAsAnyRef.asInstanceOf[T]
} else {
throw new IllegalArgumentException(s"Value of type ${valueAsAnyRef.getClass.getName} does not conform to ${classOfT.getName}.")
}
}

val valueAsBytes = {
val context = sinkCall.getContext
context.thriftConverter.serialize(valueAsSpindle)
}

sinkCall.getOutput.collect(keyValue, new BytesWritable(valueAsBytes))
}

override def sinkCleanup(
flowProcess: FlowProcess[JobConf],
sinkCall: SinkCall[SpindleSequenceFile.Context[T], OutputCollector[K, BytesWritable]]): Unit = {
sinkCall.setContext(null)
}

//
// Scheme requires a well-formed .equals() and .hashCode().
//

override def equals(that: Any): Boolean = that match {
case null => false
case that: SpindleSequenceFile[_, _] => this == that || (
this.classOfT.isAssignableFrom(that.classOfT) &&
this.classOfK.isAssignableFrom(that.classOfK))
case _ => false
}

override def hashCode(): Int = {
val hash = super.hashCode()
31 * hash + classOfT.hashCode() + classOfK.hashCode()
}
}

object SpindleSequenceFile {
/**
* Context used to hold state during calls to the source and sink methods.
*
* The ThriftConverter[T] is stored in this context to ensure there are no race conditions since it is not
* thread-safe.
*/
case class Context[T <: ThriftConverter.TType](thriftConverter: ThriftConverter[T])

def apply[K <: Writable: Manifest, T <: ThriftConverter.TType](clazzK: Class[K], clazz: Class[T]): SpindleSequenceFile[K, T] = {
new SpindleSequenceFile[K, T](clazzK, clazz)
}
}
31 changes: 31 additions & 0 deletions indexer/src/main/scala/SpindleSequenceFileSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2014 Foursquare Labs Inc. All Rights Reserved.

package com.foursquare.hadoop.scalding

import cascading.scheme.Scheme
import com.foursquare.common.thrift.ThriftConverter
import com.foursquare.hadoop.cascading.io.SpindleSequenceFile
import com.twitter.scalding._
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader}
import org.apache.hadoop.io.{BytesWritable, NullWritable, Writable}

case class SpindleSequenceFileSource[K <: Writable: Manifest, T <: ThriftConverter.TType](paths: Seq[String])(implicit mf: Manifest[T], conv: TupleConverter[(K, T)], tset: TupleSetter[(K, T)])
extends FixedPathSource(paths: _*) with Mappable[(K, T)] with TypedSink[(K, T)] {

// TODO(blackmad): Now that we're passing manifests, I don't think we really need these anymore
val classOfT: Class[T] = manifest[T].runtimeClass.asInstanceOf[Class[T]]
val classOfK: Class[K] = manifest[K].runtimeClass.asInstanceOf[Class[K]]

override def hdfsScheme = (SpindleSequenceFile[K, T](classOfK, classOfT)).asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]

// Apparently Cascading doesn't support sequence files in local mode???
override def localScheme = ???

override def converter[U >: (K, T)]: TupleConverter[U] = TupleConverter.asSuperConverter[(K, T), U](conv)

override def setter[U <: (K, T)]: TupleSetter[U] = TupleSetter.asSubSetter[(K, T), U](tset)
}

object SpindleSequenceFileSource {
def apply[K <: Writable : Manifest, T <: ThriftConverter.TType: Manifest: TupleConverter](path: String) = new SpindleSequenceFileSource[K, T](Seq(path))
}
Loading

0 comments on commit 1f10c6f

Please sign in to comment.