In [15]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import os
import sys

spark_path = 'D:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7'

In [16]:
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

In [17]:
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src")
               

In [18]:
from pyspark import SparkContext

In [19]:
from pyspark import SparkConf

In [20]:
sc = SparkContext("local",'test123')

In [21]:
sc

<pyspark.context.SparkContext at 0x8d093b0>

In [None]:
###################### PySpark Core Classes################

In [8]:
#1- pyspark.SparkContext-- Main entry point for Spark functionality.

#2- pyspark.RDD-- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

#3- pyspark.streaming.StreamingContext-- Main entry point for Spark Streaming functionality.

#4- pyspark.streaming.DStream-- A Discretized Stream (DStream), the basic abstraction in Spark Streaming.

#5- pyspark.sql.SQLContext-- Main entry point for DataFrame and SQL functionality.

#6- pyspark.sql.DataFrame-- A distributed collection of data grouped into named columns.

In [29]:
#Currently directories are only supported for Hadoop-supported filesystems
from pyspark import SparkFiles
path = os.path.join("C:\Users\Dell\Documents", "titanic_train.txt")
with open(path, "w") as testFile:
    _ = testFile.write("100")
sc.addFile(path)

In [30]:
def func(iterator):
    with open(SparkFiles.get("titanic_train.txt")) as testFile:
        fileVal = int(testFile.readline())
        return [x * fileVal for x in iterator]


In [31]:
sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()

[100, 200, 300, 400]

In [32]:
sc.applicationId

u'local-1500555868994'

In [33]:
#A unique identifier for the Spark application. Its format depends on the scheduler implementation

In [34]:
#in case of local spark app something local-1500555868994
#in case of YARN something like ‘application_1433865536131_34483

In [37]:
#Distribute a local Python collection to form an RDD. Using xrange is recommended if 
#the input represents a range for performance

In [35]:
sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

[[0], [2], [3], [4], [6]]

In [36]:
sc.parallelize(xrange(0, 6, 2), 5).glom().collect()

[[], [0], [], [2], [4]]

In [38]:
sc.range(5).collect()

[0, 1, 2, 3, 4]

In [39]:
sc.range(2, 4).collect()

[2, 3]

In [40]:
sc.range(1, 7, 2).collect()

[1, 3, 5]

In [42]:
#runJob: Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements
#If ‘partitions’ is not specified, this will run over all partitions.

In [43]:
myRDD = sc.parallelize(range(6), 3)

In [44]:
sc.runJob(myRDD, lambda part: [x * x for x in part])

[0, 1, 4, 9, 16, 25]

In [45]:
myRDD = sc.parallelize(range(6), 3)

In [46]:
sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)

[0, 1, 16, 25]

In [50]:
path = os.path.join("C:\Users\Dell\Documents", "titanic_train.txt")
textFile = sc.textFile(path)
textFile.collect()

[u'PassengerId\tSurvived\tPclass\tName\tSex\tAge\tSibSp\tParch\tTicket\tFare\tCabin\tEmbarked',
 u'1\t0\t3\t"Braund, Mr. Owen Harris"\tmale\t22\t1\t0\tA/5 21171\t7.25\t\tS',
 u'2\t1\t1\t"Cumings, Mrs. John Bradley (Florence Briggs Thayer)"\tfemale\t38\t1\t0\tPC 17599\t71.2833\tC85\tC',
 u'3\t1\t3\t"Heikkinen, Miss. Laina"\tfemale\t26\t0\t0\tSTON/O2. 3101282\t7.925\t\tS',
 u'4\t1\t1\t"Futrelle, Mrs. Jacques Heath (Lily May Peel)"\tfemale\t35\t1\t0\t113803\t53.1\tC123\tS',
 u'5\t0\t3\t"Allen, Mr. William Henry"\tmale\t35\t0\t0\t373450\t8.05\t\tS',
 u'6\t0\t3\t"Moran, Mr. James"\tmale\t\t0\t0\t330877\t8.4583\t\tQ',
 u'7\t0\t1\t"McCarthy, Mr. Timothy J"\tmale\t54\t0\t0\t17463\t51.8625\tE46\tS',
 u'8\t0\t3\t"Palsson, Master. Gosta Leonard"\tmale\t2\t3\t1\t349909\t21.075\t\tS',
 u'9\t1\t3\t"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)"\tfemale\t27\t0\t2\t347742\t11.1333\t\tS',
 u'10\t1\t2\t"Nasser, Mrs. Nicholas (Adele Achem)"\tfemale\t14\t1\t0\t237736\t30.0708\t\tC',
 u'11\t1\t3\t"Sand

In [51]:
parallelized = sc.parallelize(["textFile"])

In [53]:
parallelized.collect()

['textFile']

In [52]:
sorted(sc.union([textFile, parallelized]).collect())

[u'1\t0\t3\t"Braund, Mr. Owen Harris"\tmale\t22\t1\t0\tA/5 21171\t7.25\t\tS',
 u'10\t1\t2\t"Nasser, Mrs. Nicholas (Adele Achem)"\tfemale\t14\t1\t0\t237736\t30.0708\t\tC',
 u'100\t0\t2\t"Kantor, Mr. Sinai"\tmale\t34\t1\t0\t244367\t26\t\tS',
 u'101\t0\t3\t"Petranec, Miss. Matilda"\tfemale\t28\t0\t0\t349245\t7.8958\t\tS',
 u'102\t0\t3\t"Petroff, Mr. Pastcho (""Pentcho"")"\tmale\t\t0\t0\t349215\t7.8958\t\tS',
 u'103\t0\t1\t"White, Mr. Richard Frasar"\tmale\t21\t0\t1\t35281\t77.2875\tD26\tS',
 u'104\t0\t3\t"Johansson, Mr. Gustaf Joel"\tmale\t33\t0\t0\t7540\t8.6542\t\tS',
 u'105\t0\t3\t"Gustafsson, Mr. Anders Vilhelm"\tmale\t37\t2\t0\t3101276\t7.925\t\tS',
 u'106\t0\t3\t"Mionoff, Mr. Stoytcho"\tmale\t28\t0\t0\t349207\t7.8958\t\tS',
 u'107\t1\t3\t"Salkjelsvik, Miss. Anna Kristine"\tfemale\t21\t0\t0\t343120\t7.65\t\tS',
 u'108\t1\t3\t"Moss, Mr. Albert Johan"\tmale\t\t0\t0\t312991\t7.775\t\tS',
 u'109\t0\t3\t"Rekic, Mr. Tido"\tmale\t38\t0\t0\t349249\t7.8958\t\tS',
 u'11\t1\t3\t"Sandstrom, Miss.

In [55]:
#coalesce: Return a new RDD that is reduced into numPartitions partitions.
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()


[[1], [2, 3], [4, 5]]

In [56]:
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()

[[1, 2, 3, 4, 5]]

In [58]:
#cogroup: For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for 
#that key in self as well as other
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]

[('a', ([1], [2])), ('b', ([4], []))]

In [60]:
#collectAsMap(): Return the key-value pairs in this RDD to the master as a dictionary
#this method should only be used if the resulting data is expected to be small
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m[1], m[3]

(2, 4)

In [62]:
#combineByKey: Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.
#Users provide three functions:
    #createCombiner, which turns a V into a C (e.g., creates a one-element list)
    #mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
    #mergeCombiners, to combine two C’s into a single one.

In [65]:
x = sc.parallelize([("a", 1), ("b", 2), ("a", 5)])
def add(a, b): return a + str(b)
sorted(x.combineByKey(str, add, add).collect())

[('a', '15'), ('b', '2')]

In [66]:
sc.parallelize([2, 3, 4]).count()

3

In [70]:
#Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all 
#tasks have finished.
rdd = sc.parallelize(range(1000), 10)
rdd.countApprox(1000, .95) 

1000

In [71]:
#countApprox(timeout, confidence=0.95)

In [72]:
n = sc.parallelize(range(1000)).map(str).countApproxDistinct()

In [73]:
n

1014L

In [74]:
900 < n < 1100

True

In [75]:
n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()

In [76]:
n

19L

In [77]:
16 < n < 24

True

In [78]:
#Count the number of elements for each key, and return the result to the master as a dictionary
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

In [79]:
sorted(rdd.countByKey().items())

[('a', 2), ('b', 1)]

In [80]:
#Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())

[(1, 2), (2, 3)]

In [81]:
#Return a new RDD containing the distinct elements in this RDD.
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())

[1, 2, 3]

In [82]:
#Return a new RDD containing only the elements that satisfy a predicate.
rdd = sc.parallelize([1, 2, 3, 4, 5])

In [83]:
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

In [84]:
#Return the first element in this RDD
sc.parallelize([2, 3, 4]).first()

2

In [85]:
sc.parallelize([]).first()

ValueError: RDD is empty

In [86]:
#Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
rdd = sc.parallelize([2, 3, 4])

In [87]:
sorted(rdd.flatMap(lambda x: range(1, x)).collect())

[1, 1, 1, 2, 2, 3]

In [88]:
sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())

[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

In [89]:
#Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the 
#original RDD’s partitioning

In [90]:
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])

In [91]:
def f(x): return x

In [92]:
x.flatMapValues(f).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

In [93]:
#fold(zeroValue, op): Aggregate the elements of each partition, and then the results for all the partitions, using a given 
#associative function and a neutral “zero value.”

In [94]:
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)

15

In [95]:
#foldByKey(zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)
#Merge the values for each key using an associative function “func” and a neutral “zeroValue” which may be added to the result 
#an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication.).

In [96]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

In [97]:
from operator import add

In [98]:
sorted(rdd.foldByKey(0, add).collect())

[('a', 2), ('b', 1)]

In [None]:
#foreach(f): Applies a function to all elements of this RDD.


In [101]:
def f(x): print(x)

In [106]:
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

In [108]:
def f(iterator): 
    for x in iterator: 
        print(x)

In [109]:
sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)

In [110]:
#fullOuterJoin: Hash-partitions the resulting RDD into the given number of partitions
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])

sorted(x.fullOuterJoin(y).collect())

[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]

In [111]:
#Returns the number of partitions in RDD
rdd = sc.parallelize([1, 2, 3, 4], 2)
rdd.getNumPartitions()

2

In [112]:
#Get the RDD’s current storage level.
rdd1 = sc.parallelize([1,2])


In [113]:
rdd1.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [114]:
print(rdd1.getStorageLevel())

Serialized 1x Replicated


In [115]:
#Return an RDD created by coalescing all elements within each partition into a list
rdd = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd.glom().collect())

[[1, 2], [3, 4]]

In [116]:
#Return an RDD of grouped items.
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [2, 8]), (1, [1, 1, 3, 5])]

In [117]:
#Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions

#Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or 
#aggregateByKey will provide much better performance.

In [118]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

In [119]:
sorted(rdd.groupByKey().mapValues(len).collect())

[('a', 2), ('b', 1)]

In [120]:
sorted(rdd.groupByKey().mapValues(list).collect())

[('a', [1, 1]), ('b', [1])]

In [121]:
#groupWith(other, *others): Alias for cogroup but with support for multiple RDDs
w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
[(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]

[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

In [123]:
#Compute a histogram using the provided buckets
#If buckets is a number, it will generate buckets which are evenly spaced between the minimum and maximum of the RDD

In [126]:
rdd = sc.parallelize(range(50))
rdd.histogram(2)

([0.0, 24.5, 49], [25, 25])

In [127]:
rdd.histogram([0, 5, 25, 50])

([0, 5, 25, 50], [5, 20, 25])

In [128]:
rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets

([0, 15, 30, 45, 60], [15, 15, 15, 5])

In [129]:
rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])

In [130]:
rdd.histogram(("a", "b", "c"))

(('a', 'b', 'c'), [2, 2])

In [131]:
#intersection: Return the intersection of this RDD and another one. The output will not contain any 
#duplicate elements, even if the input RDDs did.

In [132]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])

In [133]:
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])

In [134]:
rdd1.intersection(rdd2).collect()

[2, 1, 3]

In [135]:
#an RDD may be empty even when it has at least 1 partition.
sc.parallelize([]).isEmpty()

True

In [136]:
sc.parallelize([1]).isEmpty()

False

In [138]:
#join(other, numPartitions=None):Return an RDD containing all pairs of elements with matching keys in self and other
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

[('a', (1, 2)), ('a', (1, 3))]

In [139]:
#keyBy(f): Creates tuples of the elements in this RDD by applying f
x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
y = sc.parallelize(zip(range(0,5), range(0,5)))
[(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]

[(0, [[0], [0]]),
 (1, [[1], [1]]),
 (2, [[], [2]]),
 (3, [[], [3]]),
 (4, [[2], [4]])]

In [140]:
#keys(): Return an RDD with the keys of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()

[1, 3]

In [141]:
#leftOuterJoin(other, numPartitions=None): Perform a left outer join of self and other
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.leftOuterJoin(y).collect())

[('a', (1, 2)), ('b', (4, None))]

In [142]:
#lookup(key): Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known 
#partitioner by only searching the partition that the key maps to.
l = range(1000)

rdd = sc.parallelize(zip(l, l), 10)

In [143]:
rdd.lookup(42)  # slow

[42]

In [144]:
sorted = rdd.sortByKey()
sorted.lookup(42)  # fast

[42]

In [145]:
sorted.lookup(1024)

[]

In [148]:
rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
list(rdd2.lookup(('a', 'b'))[0])

['c']

In [154]:
#mapPartitions(f, preservesPartitioning=False): Return a new RDD by applying a function to each partition of this RDD.

In [155]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()


[3, 7]

In [157]:
#mapPartitionsWithIndex(f, preservesPartitioning=False): Return a new RDD by applying a function to each partition of this 
#RDD, while tracking the index of the original partition.

rdd = sc.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): yield splitIndex
    

In [158]:
rdd.mapPartitionsWithIndex(f).sum()

6

In [159]:
rdd.collect()

[1, 2, 3, 4]

In [160]:
#mapValues(f):Pass each value in the key-value pair RDD through a map function without changing the keys; this also 
#retains the original RDD’s partitioning
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()

[('a', 3), ('b', 1)]

In [162]:
#reduce(f): Reduces the elements of this RDD using the specified commutative and associative binary operator
from operator import add


In [163]:
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)

15

In [164]:
sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)

10

In [166]:
sc.parallelize([21,12]).reduce(add)

33

In [167]:
#reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)
#Merge the values for each key using an associative and commutative reduce function
from operator import add

In [171]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(add).collect()

[('a', 2), ('b', 1)]

In [175]:
#rightOuterJoin(other, numPartitions=None): Perform a right outer join of self and other.
#For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair 
#(k, (None, w)) if no elements in self have key k.

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
y.rightOuterJoin(x).collect()

[('a', (2, 1)), ('b', (None, 4))]

In [177]:
#sample(withReplacement, fraction, seed=None): Return a sampled subset of this RDD.
rdd = sc.parallelize(range(100), 4)
6 <= rdd.sample(False, 0.30, 81).count() <= 14


False

In [178]:
#sampleByKey(withReplacement, fractions, seed=None): Return a subset of this RDD sampled by key (via stratified sampling). 
#Create a sample of this RDD using variable sampling rates for different keys as specified by fractions, a key to sampling 
#rate map.
fractions = {"a": 0.2, "b": 0.1}
rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())


In [179]:
100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150

True

In [180]:
max(sample["a"]) <= 999 and min(sample["a"]) >= 0

True

In [181]:
max(sample["b"]) <= 999 and min(sample["b"]) >= 0

True

In [182]:
#sampleStdev():Compute the sample standard deviation of this RDD’s elements
sc.parallelize([11, 21, 32]).sampleStdev()

10.503967504392488

In [183]:
#sampleVariance():Compute the sample variance of this RDD’s elements
sc.parallelize([101, 232, 563]).sampleVariance()

56694.33333333334

In [184]:
#setName(name): Assign a name to this RDD.
rdd1 = sc.parallelize([1, 2])
rdd1.setName('RDD1').name()

u'RDD1'

In [186]:
#sortBy(keyfunc, ascending=True, numPartitions=None): Sorts this RDD by the given keyfunc
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()


[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [187]:
sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()

[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

In [188]:
#sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fc35dbcf848>)
#Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey().first()

('1', 3)

In [189]:
sc.parallelize(tmp).sortByKey(True, 1).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [190]:
sc.parallelize(tmp).sortByKey(True, 2).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [195]:
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()

[('a', 3),
 ('fleece', 7),
 ('had', 2),
 ('lamb', 5),
 ('little', 4),
 ('Mary', 1),
 ('was', 8),
 ('white', 9),
 ('whose', 6)]

In [196]:
#takeSample(withReplacement, num, seed=None)
#Return a fixed-size sampled subset of this RDD.
rdd = sc.parallelize(range(0, 10))
len(rdd.takeSample(True, 20, 1))

20

In [197]:
len(rdd.takeSample(False, 5, 2))

5

In [198]:
len(rdd.takeSample(False, 15, 3))

10

In [199]:
#Get the top N elements from an RDD
sc.parallelize([10, 4, 2, 12, 3]).top(1)
sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)

[4, 3, 2]

In [201]:
#union(other):Return the union of this RDD and another one.
rdd = sc.parallelize([1, 1, 2, 3])
rdd.union(rdd).collect()

[1, 1, 2, 3, 1, 1, 2, 3]

In [202]:
#values():Return an RDD with the values of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).values()
m.collect()

[2, 4]

In [203]:
#zip: Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each 
#RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

In [204]:
#zipWithIndex(): Zips this RDD with its element indices.
sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()


[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

In [205]:
#zipWithUniqueId(): Zips this RDD with generated unique Long ids.
sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()

[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]

In [207]:
data = [12,32,45,65,67,89]

In [208]:
print data

[12, 32, 45, 65, 67, 89]


In [209]:
#A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of 
#elements that can be operated on in parallel.
#parallelization
rdd1 = sc.parallelize(data)

In [210]:
print rdd1

ParallelCollectionRDD[391] at parallelize at PythonRDD.scala:475


In [211]:
rdd1.collect()

[12, 32, 45, 65, 67, 89]

In [212]:
rdd1.count()

6

In [30]:
sc.version

u'2.1.0'

In [31]:
sc.pythonVer

'2.7'

In [32]:
sc.master

u'local[2]'

In [33]:
rdd1.getNumPartitions()

2

In [34]:
rdd1.count()

6

In [35]:
sc.parallelize([]).isEmpty()

True

In [36]:
sc.parallelize([data]).isEmpty()

False

In [37]:
# basic statistics
rdd1.collect()

[12, 32, 45, 65, 67, 89]

In [38]:
rdd1.max()

89

In [39]:
rdd1.min()

12

In [40]:
rdd1.stdev()

25.203615260954571

In [41]:
rdd1.mean()

51.666666666666664

In [42]:
rdd1.variance()

635.2222222222222

In [43]:
rdd1.stats()

(count: 6, mean: 51.6666666667, stdev: 25.203615261, max: 89.0, min: 12.0)

In [44]:
import numpy as np
from pyspark.mllib.stat import Statistics

In [45]:
mat = sc.parallelize(
    [np.array([10.1,12.4,14.5,16.8,21]),np.array([21.3,24.2,35.4,36.4,31.7]),np.array([21.1,23.,54.,65.,71.])]
)

In [46]:
print mat

ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:475


In [47]:
summary=Statistics.colStats(mat)

In [48]:
summary.mean()

array([ 17.5       ,  19.86666667,  34.63333333,  39.4       ,  41.23333333])

In [49]:
summary.variance()

array([  41.08      ,   42.17333333,  390.50333333,  587.56      ,
        693.16333333])

In [50]:
summary.numNonzeros()

array([ 3.,  3.,  3.,  3.,  3.])

In [51]:
# calculate correlation
X = sc.parallelize([10.1,12.4,14.5,16.8,21])
Y = sc.parallelize([21.3,24.2,35.4,36.4,31.7])

In [52]:
corr = Statistics.corr(X,Y,method='pearson')

In [53]:
corr

0.6779641435411099

In [54]:
from pyspark.mllib.linalg import Matrices, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

In [55]:
vec = Vectors.dense(10.1,12.4,14.5,16.8,21,21.3,24.2,35.4,36.4,31.7)

In [65]:
vec

DenseVector([10.1, 12.4, 14.5, 16.8, 21.0, 21.3, 24.2, 35.4, 36.4, 31.7])

In [66]:
goodnestest = Statistics.chiSqTest(vec)

In [67]:
print (goodnestest)

Chi squared test summary:
method: pearson
degrees of freedom = 9 
statistic = 35.878284182305634 
pValue = 4.166733496191455E-5 
Very strong presumption against null hypothesis: observed follows the same distribution as expected..
