Skip to content

Commit

Permalink
add RDD.lookup(key)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 22, 2014
1 parent 050f8d0 commit eb1305d
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1777,10 +1777,26 @@ def _defaultReducePartitions(self):
else:
return self.getNumPartitions()

# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
def lookup(self, key):
"""
Return the list of values in the RDD for key `key`. This operation
is done efficiently if the RDD has a known partitioner by only
searching the partition that the key maps to.
>>> l = range(1000)
>>> rdd = sc.parallelize(zip(l, l), 10)
>>> rdd.lookup(42) # slow
[42]
>>> sorted = rdd.sortByKey()
>>> sorted.lookup(42) # fast
[42]
"""
values = self.filter(lambda (k, v): k == key).values()

if hasattr(self, "_partitionFunc"):
return self.ctx.runJob(self, lambda x: x, [self._partitionFunc(key)], False)

return values.collect()


class PipelinedRDD(RDD):
Expand Down

0 comments on commit eb1305d

Please sign in to comment.