In [1]:
import findspark

In [2]:
!ls /usr

bin    hdp	lib64	 share	      src
etc    include	libexec  spark2.3     tmp
games  jdk64	local	 spark2.4.3   zeppelin-0.10.1-bin-all
go     lib	sbin	 spark-3.1.2


In [3]:
findspark.init('/usr/spark2.4.3')

In [4]:
# import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
# from pyspark.context import SparkContext
# from pyspark.sql.functions import * 
# from pyspark.sql.types import * 

spark = SparkSession.builder.appName("PysparkExample").getOrCreate()
sc = spark.sparkContext


In [5]:
spark

In [6]:
!hadoop fs -ls /data/mr/wordcount

Found 3 items
-rw-r--r--   3 hdfs hdfs    6488666 2019-07-22 17:55 /data/mr/wordcount/big.txt
drwxr-xr-x   - hdfs hdfs          0 2019-07-22 17:55 /data/mr/wordcount/input
-rw-r--r--   3 hdfs hdfs      22690 2019-07-22 17:55 /data/mr/wordcount/sg-amr.jar


In [8]:
rdd = sc.textFile('/data/mr/wordcount/big.txt')

In [9]:
rdd.take(10)

['The Project Gutenberg EBook of The Adventures of Sherlock Holmes',
 'by Sir Arthur Conan Doyle',
 '(#15 in our series by Sir Arthur Conan Doyle)',
 '',
 'Copyright laws are changing all over the world. Be sure to check the',
 'copyright laws for your country before downloading or redistributing',
 'this or any other Project Gutenberg eBook.',
 '',
 'This header should be the first thing seen when viewing this Project',
 'Gutenberg file.  Please do not remove it.  Do not change or edit the']

In [10]:
!hadoop fs -cat /data/mr/wordcount/big.txt|wc -l

128457


In [11]:
rdd.count()

128457

In [51]:
rddnum = sc.parallelize([1,2,3,4,5])

In [16]:
rddnum.count()

5

In [26]:
# help(rddnum)
# map, flatMap, reduce, reduceByKey, take, collect,
# filter


In [44]:
def avg(x, y): 
    a = (x+y)/2
    print("computing avg of ", x, y, ": ", a)
    return a

In [45]:
avg(2, 3)

computing avg of  2 3 :  2.5


2.5

In [46]:
rddnum.reduce(avg)

computing avg of  1 2 :  1.5
computing avg of  1.5 3 :  2.25
computing avg of  2.25 4 :  3.125
computing avg of  3.125 5 :  4.0625


4.0625

In [48]:
rddnum.reduce(avg)

computing avg of  1.5 4.25 :  2.875


2.875

In [49]:
def add(x, y):
    res = x + y
    print("Add called on ", x, y, ": ", res)
    return res


In [59]:
total = rddnum.reduce(add)

Add called on  1 2 :  3
Add called on  3 3 :  6
Add called on  6 4 :  10
Add called on  10 5 :  15


In [53]:
def to_count(x):
    print("To count: called on ", x)
    return 1


In [58]:
cnt = rddnum.map(to_count).reduce(add)

Add called on  1 1 :  2
Add called on  2 1 :  3
Add called on  3 1 :  4
Add called on  4 1 :  5


In [60]:
total/cnt

3.0

In [62]:
def totuple(x):
    return (x, 1)


In [66]:
tuple_rdd = rddnum.map(totuple)

In [73]:
def addtuple(t1, t2):
    print("Adding: ", t1, t2)
    tot = t1[0] + t2[0]
    cnt = t1[1] + t2[1]
    print("    => ", (tot, cnt))
    return (tot, cnt)

In [68]:
addtuple((4, 2), (10, 3))

(14, 5)

In [131]:
tuple_rdd.take(10)

[(1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]

In [129]:
tot, cnt = tuple_rdd.reduce(addtuple)

Adding:  (1, 1) (2, 1)
    =>  (3, 2)
Adding:  (3, 2) (3, 1)
    =>  (6, 3)
Adding:  (6, 3) (4, 1)
    =>  (10, 4)
Adding:  (10, 4) (5, 1)
    =>  (15, 5)


In [130]:
tot, cnt = tuple_rdd.fold((0,0), addtuple)

Adding:  (0, 0) (0, 0)
    =>  (0, 0)
Adding:  (0, 0) (1, 1)
    =>  (1, 1)
Adding:  (1, 1) (2, 1)
    =>  (3, 2)
Adding:  (3, 2) (3, 1)
    =>  (6, 3)
Adding:  (6, 3) (4, 1)
    =>  (10, 4)
Adding:  (10, 4) (5, 1)
    =>  (15, 5)


In [71]:
tot/cnt

3.0

In [77]:
# rddnum.aggregate?

In [78]:
# sc.parallelize([]).aggregate((0, 0), seqOp, combOp)

In [86]:
def seqOp(tpl, x):
    print("SeqOp: ", tpl, x)
    tot, cnt = tpl
    return (tot + x, cnt+1)

In [89]:
def combOp(t1, t2):
    print("comOp: ", t1, t2)
    tot1, cnt1 = t1
    tot2, cnt2 = t2
    return (tot1 + tot2, cnt1 + cnt2)

In [95]:
rddnum = sc.parallelize([1,2,3,4,5])

In [96]:
rddnum.aggregate((0,0), seqOp, combOp)

comOp:  (0, 0) (0, 0)
comOp:  (0, 0) (1, 1)
comOp:  (1, 1) (2, 1)
comOp:  (3, 2) (3, 1)
comOp:  (6, 3) (4, 1)
comOp:  (10, 4) (5, 1)


(15, 5)

In [127]:
rddnum.take(10)
# rddnum.fold(zeroValue, op)

[1, 2, 3, 4, 5]

In [125]:
def myfold_add2tpl(t1, t2):
    
    return (cnt, tot)

In [126]:
rddnum.fold((0,0), myfold_add2tpl)

Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)
Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)
Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)
Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)
Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)
Adding  (0, 0) ;; (0, 0)
tot:  0
Result:  (0, 0)


(0, 0)

In [128]:
rddnum.fold?

In [132]:
myrdd = sc.parallelize(range(1,10), 2)

In [133]:
myrdd.collect()


[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [135]:

#myrdd1 = myrdd.map(_.toString)
myrdd1 = myrdd.map(lambda x: str(x))

In [136]:
myrdd1.collect()

['1', '2', '3', '4', '5', '6', '7', '8', '9']

In [137]:
# def concat(s:String, n:String):String = s + n

def concat(s, n):
    return s + n

In [138]:
s = "_"

In [139]:
myrdd1.fold(s, concat)

'__1234_56789'