# 创建int RDD数据

In [5]:
intRDD =sc.parallelize([3,1,2,5,5])
intRDD.collect()

[3, 1, 2, 5, 5]

In [57]:
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(__builtin__.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |   

In [58]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(self, c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [60]:
help(intRDD.collect)

Help on method collect in module pyspark.rdd:

collect(self) method of pyspark.rdd.RDD instance
    Return a list that contains all of the elements in this RDD.
    
    .. note:: This method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.



# 创建string RDD数据

In [7]:
stringRDD=sc.parallelize(["Aplle","Orange","Banana","Grape","Apple"])
stringRDD.collect()

['Aplle', 'Orange', 'Banana', 'Grape', 'Apple']

In [8]:
def addOne(x):
    return(x+1)
intRDD.map(addOne).collect()

[4, 2, 3, 6, 6]

In [11]:
intRDD.map(lambda x:x+1).collect()

[4, 2, 3, 6, 6]

In [12]:
stringRDD.map(lambda x:"fruit:"+x).collect()

['fruit:Aplle', 'fruit:Orange', 'fruit:Banana', 'fruit:Grape', 'fruit:Apple']

In [14]:
intRDD.filter(lambda x:x<3).collect()

[1, 2]

In [15]:
stringRDD.filter(lambda x:"ge" in x).collect()

['Orange']

# 删除重复元素

In [16]:
intRDD.distinct().collect()

[1, 5, 2, 3]

In [17]:
stringRDD.distinct().collect()

['Orange', 'Grape', 'Aplle', 'Banana', 'Apple']

# 随机按比例分割

In [30]:
sRDD=intRDD.randomSplit([0.4,0.6])
print '%s,%s'%(sRDD[0].collect(),sRDD[1].collect())

[1, 2],[3, 5, 5]


In [51]:
help(intRDD.randomSplit)

Help on method randomSplit in module pyspark.rdd:

randomSplit(self, weights, seed=None) method of pyspark.rdd.RDD instance
    Randomly splits this RDD with the provided weights.
    
    :param weights: weights for splits, will be normalized if they don't sum to 1
    :param seed: random seed
    :return: split RDDs in a list
    
    >>> rdd = sc.parallelize(range(500), 1)
    >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
    >>> len(rdd1.collect() + rdd2.collect())
    500
    >>> 150 < rdd1.count() < 250
    True
    >>> 250 < rdd2.count() < 350
    True



# 分组排序

In [53]:
gRDD=intRDD.groupBy(
    lambda x:"even" if(x%2==0) else "odd" 
    ).collect()
print '%s' %(gRDD[0][0]),'%s' %(sorted(gRDD[0][1]))
print '%s' %(gRDD[1][0]),'%s' %(sorted(gRDD[1][1]))

even [2]
odd [1, 3, 5, 5]


In [41]:
help(intRDD.groupBy)

Help on method groupBy in module pyspark.rdd:

groupBy(self, f, numPartitions=None, partitionFunc=<function portable_hash>) method of pyspark.rdd.RDD instance
    Return an RDD of grouped items.
    
    >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
    >>> result = rdd.groupBy(lambda x: x % 2).collect()
    >>> sorted([(x, sorted(y)) for (x, y) in result])
    [(0, [2, 8]), (1, [1, 1, 3, 5])]



In [50]:
help(sorted)

Help on built-in function sorted in module __builtin__:

sorted(...)
    sorted(iterable, cmp=None, key=None, reverse=False) --> new sorted list



# 多个RDD转换运算

In [59]:
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])

In [61]:
intRDD1.union(intRDD2).union(intRDD2).collect()

[3, 1, 2, 5, 5, 5, 6, 5, 6]

In [63]:
help(intRDD1.union)

Help on method union in module pyspark.rdd:

union(self, other) method of pyspark.rdd.RDD instance
    Return the union of this RDD and another one.
    
    >>> rdd = sc.parallelize([1, 1, 2, 3])
    >>> rdd.union(rdd).collect()
    [1, 1, 2, 3, 1, 1, 2, 3]



In [64]:
intRDD1.intersection(intRDD1).collect()

[1, 2, 3, 5]

In [65]:
help(intRDD1.intersection)

Help on method intersection in module pyspark.rdd:

intersection(self, other) method of pyspark.rdd.RDD instance
    Return the intersection of this RDD and another one. The output will
    not contain any duplicate elements, even if the input RDDs did.
    
    .. note:: This method performs a shuffle internally.
    
    >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
    >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
    >>> rdd1.intersection(rdd2).collect()
    [1, 2, 3]



In [66]:
intRDD1.subtract(intRDD2).collect()

[1, 2, 3]

In [67]:
help(intRDD1.subtract)

Help on method subtract in module pyspark.rdd:

subtract(self, other, numPartitions=None) method of pyspark.rdd.RDD instance
    Return each value in C{self} that is not contained in C{other}.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
    >>> y = sc.parallelize([("a", 3), ("c", None)])
    >>> sorted(x.subtract(y).collect())
    [('a', 1), ('b', 4), ('b', 5)]



In [70]:
#笛卡尔乘积运算

In [68]:
intRDD1.cartesian(intRDD2).collect()

[(3, 5),
 (3, 6),
 (1, 5),
 (1, 6),
 (2, 5),
 (2, 6),
 (5, 5),
 (5, 5),
 (5, 6),
 (5, 6)]

In [69]:
help(intRDD1.cartesian)

Help on method cartesian in module pyspark.rdd:

cartesian(self, other) method of pyspark.rdd.RDD instance
    Return the Cartesian product of this RDD and another one, that is, the
    RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
    C{b} is in C{other}.
    
    >>> rdd = sc.parallelize([1, 2])
    >>> sorted(rdd.cartesian(rdd).collect())
    [(1, 1), (1, 2), (2, 1), (2, 2)]



# 基本动作运算

In [88]:
intRDD.first() #取出第一项

3

In [83]:
help(intRDD.first)

Help on method first in module pyspark.rdd:

first(self) method of pyspark.rdd.RDD instance
    Return the first element in this RDD.
    
    >>> sc.parallelize([2, 3, 4]).first()
    2
    >>> sc.parallelize([]).first()
    Traceback (most recent call last):
        ...
    ValueError: RDD is empty



In [94]:
intRDD.take(2)  #取前n项，超过已存在的元素数，则取存在元素的个数

[3, 1]

In [93]:
help(intRDD.take)  

Help on method take in module pyspark.rdd:

take(self, num) method of pyspark.rdd.RDD instance
    Take the first num elements of the RDD.
    
    It works by first scanning one partition, and use the results from
    that partition to estimate the number of additional partitions needed
    to satisfy the limit.
    
    Translated from the Scala implementation in RDD#take().
    
    .. note:: this method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.
    
    >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
    [2, 3]
    >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
    [2, 3, 4, 5, 6]
    >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
    [91, 92, 93]



In [95]:
intRDD.takeOrdered(3) #排序从小到大(ascending order)，前3个

[1, 2, 3]

In [106]:
intRDD.takeOrdered(5,lambda x:0 if x==3 else 1) #指定排序算法

[3, 1, 2, 5, 5]

In [97]:
help(intRDD.takeOrdered)

Help on method takeOrdered in module pyspark.rdd:

takeOrdered(self, num, key=None) method of pyspark.rdd.RDD instance
    Get the N elements from an RDD ordered in ascending order or as
    specified by the optional key function.
    
    .. note:: this method should only be used if the resulting array is expected
        to be small, as all the data is loaded into the driver's memory.
    
    >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
    [1, 2, 3, 4, 5, 6]
    >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
    [10, 9, 7, 6, 5, 4]



In [111]:
print intRDD.stats()  #统计
#计数，平均，标准差，最大，最小，总和
print intRDD.count(),\
    intRDD.mean(),intRDD.stdev(),intRDD.max(),intRDD.min(),intRDD.sum()


(count: 5, mean: 3.2, stdev: 1.6, max: 5, min: 1)
5 3.2 1.6 5 1 16


In [112]:
help(intRDD.stats)

Help on method stats in module pyspark.rdd:

stats(self) method of pyspark.rdd.RDD instance
    Return a L{StatCounter} object that captures the mean, variance
    and count of the RDD's elements in one operation.



# RDD Key-Value基本转换运算

In [1]:
kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD1.collect()

[(3, 4), (3, 6), (5, 6), (1, 2)]

In [2]:
help(kvRDD1)

Help on RDD in module pyspark.rdd object:

class RDD(__builtin__.object)
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |  
 |  __repr__(self)
 |  
 |  aggregate(self, zeroValue, seqOp, combOp)
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine functions and a neutral "zero
 |      value."
 |      
 |      The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
 |      as its result value to avoid object allocat

In [5]:
kvRDD1.keys().collect()

[3, 3, 5, 1]

In [6]:
help(kvRDD1.keys())

Help on PipelinedRDD in module pyspark.rdd object:

class PipelinedRDD(RDD)
 |  Pipelined maps:
 |  
 |  >>> rdd = sc.parallelize([1, 2, 3, 4])
 |  >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  
 |  Pipelined reduces:
 |  >>> from operator import add
 |  >>> rdd.map(lambda x: 2 * x).reduce(add)
 |  20
 |  >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
 |  20
 |  
 |  Method resolution order:
 |      PipelinedRDD
 |      RDD
 |      __builtin__.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, prev, func, preservesPartitioning=False)
 |  
 |  getNumPartitions(self)
 |  
 |  id(self)
 |  
 |  ----------------------------------------------------------------------
 |  Methods inherited from RDD:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + r

In [7]:
kvRDD1.values().collect()

[4, 6, 6, 2]

In [8]:
kvRDD1.filter(lambda x:x[0]<5).collect() #x[0]:key x[1]:value

[(3, 4), (3, 6), (1, 2)]

In [10]:
kvRDD1.filter(lambda x:x[1]<5).collect()

[(3, 4), (1, 2)]

In [12]:
kvRDD1.mapValues(lambda x:x*x).collect()

[(3, 16), (3, 36), (5, 36), (1, 4)]

In [15]:
help(kvRDD1.mapValues)

Help on method mapValues in module pyspark.rdd:

mapValues(self, f) method of pyspark.rdd.RDD instance
    Pass each value in the key-value pair RDD through a map function
    without changing the keys; this also retains the original RDD's
    partitioning.
    
    >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
    >>> def f(x): return len(x)
    >>> x.mapValues(f).collect()
    [('a', 3), ('b', 1)]



In [22]:
print kvRDD1.sortByKey(ascending=True).collect()
print kvRDD1.sortByKey(ascending=False).collect()
print kvRDD1.sortByKey().collect()  #default false

[(1, 2), (3, 4), (3, 6), (5, 6)]
[(5, 6), (3, 4), (3, 6), (1, 2)]
[(1, 2), (3, 4), (3, 6), (5, 6)]


In [17]:
help(kvRDD1.sortByKey)

Help on method sortByKey in module pyspark.rdd:

sortByKey(self, ascending=True, numPartitions=None, keyfunc=<function <lambda>>) method of pyspark.rdd.RDD instance
    Sorts this RDD, which is assumed to consist of (key, value) pairs.
    # noqa
    
    >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    >>> sc.parallelize(tmp).sortByKey().first()
    ('1', 3)
    >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
    >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]



In [23]:
kvRDD1.reduceByKey(lambda x,y:max(x,y)).collect()

[(1, 2), (5, 6), (3, 6)]

# 多个RDD Key-Value "转换"运算

In [26]:
kvRDD1=sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD2=sc.parallelize([(3,4)])
print kvRDD1.collect()
print kvRDD2.collect()

[(3, 4), (3, 6), (5, 6), (1, 2)]
[(3, 4)]


In [32]:
kvRDD1.join(kvRDD2).collect()  #以键值为外连接，以共有键为输出键，值为组合值

[(3, (4, 4)), (3, (6, 4))]

In [33]:
help(kvRDD1.join)

Help on method join in module pyspark.rdd:

join(self, other, numPartitions=None) method of pyspark.rdd.RDD instance
    Return an RDD containing all pairs of elements with matching keys in
    C{self} and C{other}.
    
    Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
    (k, v1) is in C{self} and (k, v2) is in C{other}.
    
    Performs a hash join across the cluster.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2), ("a", 3)])
    >>> sorted(x.join(y).collect())
    [('a', (1, 2)), ('a', (1, 3))]



In [29]:
kvRDD1.leftOuterJoin(kvRDD2).collect()  #左外连接，以左边元素的键为输出键，值组合输出

[(1, (2, None)), (3, (4, 4)), (3, (6, 4)), (5, (6, None))]

In [30]:
kvRDD2.leftOuterJoin(kvRDD1).collect()

[(3, (4, 4)), (3, (4, 6))]

In [34]:
help(kvRDD2.leftOuterJoin)

Help on method leftOuterJoin in module pyspark.rdd:

leftOuterJoin(self, other, numPartitions=None) method of pyspark.rdd.RDD instance
    Perform a left outer join of C{self} and C{other}.
    
    For each element (k, v) in C{self}, the resulting RDD will either
    contain all pairs (k, (v, w)) for w in C{other}, or the pair
    (k, (v, None)) if no elements in C{other} have key k.
    
    Hash-partitions the resulting RDD into the given number of partitions.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2)])
    >>> sorted(x.leftOuterJoin(y).collect())
    [('a', (1, 2)), ('b', (4, None))]



In [31]:
kvRDD2.rightOuterJoin(kvRDD1).collect()

[(1, (None, 2)), (3, (4, 4)), (3, (4, 6)), (5, (None, 6))]

In [37]:
print kvRDD1.subtractByKey(kvRDD2).collect()
print kvRDD2.subtractByKey(kvRDD1).collect()

[(1, 2), (5, 6)]
[]


In [36]:
help(kvRDD1.subtractByKey)  #输出self和other中不匹配的元素

Help on method subtractByKey in module pyspark.rdd:

subtractByKey(self, other, numPartitions=None) method of pyspark.rdd.RDD instance
    Return each (key, value) pair in C{self} that has no pair with matching
    key in C{other}.
    
    >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
    >>> y = sc.parallelize([("a", 3), ("c", None)])
    >>> sorted(x.subtractByKey(y).collect())
    [('b', 4), ('b', 5)]



# Key-Value "动作"运算

In [42]:
kvFirst=kvRDD1.first()
print kvFirst[0],kvFirst[1]

3 4


In [39]:
kvRDD1.take(2)

[(3, 4), (3, 6)]

In [52]:
print kvRDD1.count()
print kvRDD1.countByKey()
print kvRDD1.sortByKey().countByValue()

4
defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1})
defaultdict(<type 'int'>, {(1, 2): 1, (5, 6): 1, (3, 4): 1, (3, 6): 1})


In [54]:
KV=kvRDD1.collectAsMap()
KV

{1: 2, 3: 6, 5: 6}

In [56]:
type(KV)

dict

In [58]:
print KV[1]

2


In [59]:
kvRDD1.lookup(3)

[4, 6]

In [60]:
help(kvRDD1.lookup)

Help on method lookup in module pyspark.rdd:

lookup(self, key) method of pyspark.rdd.RDD instance
    Return the list of values in the RDD for key `key`. This operation
    is done efficiently if the RDD has a known partitioner by only
    searching the partition that the key maps to.
    
    >>> l = range(1000)
    >>> rdd = sc.parallelize(zip(l, l), 10)
    >>> rdd.lookup(42)  # slow
    [42]
    >>> sorted = rdd.sortByKey()
    >>> sorted.lookup(42)  # fast
    [42]
    >>> sorted.lookup(1024)
    []
    >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
    >>> list(rdd2.lookup(('a', 'b'))[0])
    ['c']



# Broadcast 广播变量