In [1]:
import pyspark

In [2]:
# Create a Spark Context object

sc = pyspark.SparkContext('local[*]',appName='Python_Examples')

In [8]:
simple_rdd = sc.parallelize(range(20),4)

In [14]:
simple_rdd.getNumPartitions()

4

In [20]:
rdd_2 = simple_rdd.filter(lambda x:x%5==0)
rdd_3 = rdd_2.map(lambda x:x**2)
print rdd_3.collect()

[0, 25, 100, 225]


In [21]:
# Map is just like Python MAP

l = [1,2,3,5]
map(lambda x:x**3,l)

[1, 8, 27, 125]

In [22]:
# Difference between a map and a flatmap

numrdd = sc.parallelize([[1,2],[4,5],[9,4]])
numrdd.map(lambda x:x*2).collect()

[[1, 2, 1, 2], [4, 5, 4, 5], [9, 4, 9, 4]]

In [23]:
numrdd.flatMap(lambda x:x*2).collect()

[1, 2, 1, 2, 4, 5, 4, 5, 9, 4, 9, 4]

In [24]:
numrdd.flatMap(lambda x:[i*2 for i in x]).collect()

[2, 4, 8, 10, 18, 8]

In [27]:
a = sc.parallelize(range(1000000),4)

In [28]:
b = a.filter(lambda x:x%3353 == 0)

In [29]:
# How Lazy is Spark?

# list + "str" should error out, why is this not erroring out?
# What is Lazy Evaluation?

numrdd_2 = numrdd.flatMap(lambda x:x+"pop")

In [None]:
# Will this work?`
#numrdd_2.count()

map(lambda x:(x,1),"lkjdlfs sdfsd fsd fsd sdf".split())



In [34]:
a = sc.parallelize(['a','b','a','c','b'])
a.map(lambda x: (x,1)).collect()

[('a', 1), ('b', 1), ('a', 1), ('c', 1), ('b', 1)]

In [35]:
from stop_words import get_stop_words
rdd = sc.textFile(r"H:\notebooks\Edureka\Class 8\Relativity.txt",4)
(
    rdd
    .flatMap(lambda x:x.split())
    .filter(lambda x:x not in get_stop_words('en'))
    .map(lambda x: (x,1))
    .reduceByKey(lambda x,y:x+y)
    .sortBy(lambda x:x[1],ascending=False)
    .take(10)
)

[(u'Project', 78),
 (u'The', 62),
 (u'Gutenberg-tm', 53),
 (u'theory', 44),
 (u'may', 36),
 (u'work', 35),
 (u'one', 32),
 (u'can', 30),
 (u'Einstein', 28),
 (u'electronic', 27)]

In [36]:
num1 = sc.parallelize(range(10),4)
num2 = sc.parallelize(range(10,0,-1)) 

In [37]:
num1.intersection(num2).count()

9

In [38]:
num1.glom().collect()

[[0, 1], [2, 3], [4, 5], [6, 7, 8, 9]]

In [39]:
%%time
# Coalesce reuses partitions if the number of partitions is decreased
num1.coalesce(3).glom().collect()

Wall time: 7.48 s


[[0, 1], [2, 3], [4, 5, 6, 7, 8, 9]]

In [40]:
%%time
num1.repartition(3).glom().collect()

Wall time: 7.18 s


[[6, 7], [0, 1, 2, 3, 8, 9], [4, 5]]

In [41]:
rdd3 = num1.groupBy(lambda x: x%3)

In [42]:
%%time
rdd3.collect()

Wall time: 14.5 s


[(0, <pyspark.resultiterable.ResultIterable at 0x64612e8>),
 (1, <pyspark.resultiterable.ResultIterable at 0x3b1c588>),
 (2, <pyspark.resultiterable.ResultIterable at 0x637c358>)]

In [44]:
%%time
rdd3.collect()

Wall time: 8.26 s


[(0, <pyspark.resultiterable.ResultIterable at 0x64699e8>),
 (1, <pyspark.resultiterable.ResultIterable at 0x64699b0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x637cf28>)]

In [45]:
rdd3 = rdd3.persist()

In [48]:
%%time
[(x,list(y)) for x,y in  rdd3.collect()]

Wall time: 1.22 s


[(0, [0, 3, 6, 9]), (1, [1, 4, 7]), (2, [2, 5, 8])]

### Broadcast and Accumulator

In [49]:
lookup_country = sc.broadcast({2:'India',3:'US'})

In [50]:
lookup_country.value

{2: 'India', 3: 'US'}

In [51]:
rec = sc.parallelize([{'name':'Ganesh','age':30,'Country':2},{'name':'John','Country':3,'age':35}],2)

In [52]:
rec.map(lambda x:(x['name'],lookup_country.value[x['Country']])).collect()

[('Ganesh', 'India'), ('John', 'US')]

In [53]:
rdd = sc.parallelize(range(30),4)

In [57]:
## Find the number of elements divisible by 3

def is3(x,counter):
    if x%3 == 0:
        counter += 1
        print counter

i = 0
accum = sc.accumulator(0)

rdd.foreach(lambda x:is3(x,i))
rdd.foreach(lambda x:is3(x,accum))



In [58]:
print accum.value , i

10 0


### Spark SQL

In [59]:
sql = pyspark.SQLContext(sc)

In [60]:
l = [('Ganesh',34), ('John', 43)]
sdf = sql.createDataFrame(l,schema=('Name','age'))

In [61]:
sdf.show()

+------+---+
|  Name|age|
+------+---+
|Ganesh| 34|
|  John| 43|
+------+---+



In [62]:
sdf.select('Name').show()
sdf.filter(sdf.age>40).show()

+------+
|  Name|
+------+
|Ganesh|
|  John|
+------+

+----+---+
|Name|age|
+----+---+
|John| 43|
+----+---+



In [65]:
sdf1 = sdf.filter(sdf.age>40)

In [63]:
df = sdf.toPandas()

In [66]:
type(df)

pandas.core.frame.DataFrame

In [67]:
df.head()

Unnamed: 0,Name,age
0,Ganesh,34
1,John,43


In [69]:
sdf.registerTempTable('people')

In [70]:
sql.sql('select *  from people where age<40').show()

+------+---+
|  Name|age|
+------+---+
|Ganesh| 34|
+------+---+



In [71]:
iris = sql.read.csv("H:/iris.csv",header=True)
#sql.read.csv?

In [72]:
iris.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|          3|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|           5|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|           5|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|          3|         1.4|        0.1| setosa|
|         4.3|          3|         1.1| 

In [73]:
json = sc.parallelize([{'name':'Ganesh','Age':34}])

In [74]:
jdf = sql.read.json(json)
#sql.read.json?

In [75]:
jdf.show()

+---+------+
|Age|  name|
+---+------+
| 34|Ganesh|
+---+------+



In [76]:
sc.stop()