In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext("local", "first app")

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark Create RDD example").config("spark.some.config.option", "some-value").getOrCreate()

23/05/02 08:53:33 WARN Utils: Your hostname, msobhvm resolves to a loopback address: 127.0.1.1; using 192.168.238.129 instead (on interface ens33)
23/05/02 08:53:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/02 08:53:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"])

In [3]:
#action: count items
words.count()

                                                                                

8

In [4]:
#action: Generate local collection
words.collect()

['scala',
 'java',
 'hadoop',
 'spark',
 'akka',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark']

In [5]:
#action: for each item call custom function
def f(x): print(x)

words.foreach(f)

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark


In [6]:
#transform: apply filter
result_rdd = words.filter(lambda x: 'spark' in x)
result_rdd.collect()

['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

In [7]:
#transform: map items to another item with different form. EX: ('The') -> ('The', 1)
result_rdd = words.map(lambda x: (x, 1))
result_rdd.collect()

[('scala', 1),
 ('java', 1),
 ('hadoop', 1),
 ('spark', 1),
 ('akka', 1),
 ('spark vs hadoop', 1),
 ('pyspark', 1),
 ('pyspark and spark', 1)]

In [8]:
#action: extract data frame from RDD
result_rdd = words.map(lambda x: (x, 1))
df = result_rdd.toDF()
df.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-----------------+---+
|               _1| _2|
+-----------------+---+
|            scala|  1|
|             java|  1|
|           hadoop|  1|
|            spark|  1|
|             akka|  1|
|  spark vs hadoop|  1|
|          pyspark|  1|
|pyspark and spark|  1|
+-----------------+---+



                                                                                

In [9]:
#transform: map item -> another items. EX: (4) -> (1), (2), (3)
rdd = sc.parallelize([2, 3, 4])
result_rdd = rdd.flatMap(lambda x: range(1, x))
result_rdd.collect()

[1, 1, 2, 1, 2, 3]

In [10]:
#action: generate local collection using dictionry map form
result_rdd = sc.parallelize([(1, 2), (3, 4)])
result_rdd.collectAsMap()

{1: 2, 3: 4}

In [11]:
#action: reduce
from operator import add
nums = sc.parallelize([1, 2, 3, 4, 5])
sum = nums.reduce(add)
print("Adding all the elements -> %i" % (sum))

Adding all the elements -> 15


In [12]:
#transform: map item -> another item/items pairs. EX: ('apple', [1, 2, 5]) -> ('apple', 1), ('apple', 2), ('apple', 5)
rdd = sc.parallelize([('apple', [1, 2, 5]), ('orange', [3, 5, 9]), ('banana', [2, 4])])
result_rdd = rdd.flatMapValues(lambda x: x)
result_rdd.collect()

[('apple', 1),
 ('apple', 2),
 ('apple', 5),
 ('orange', 3),
 ('orange', 5),
 ('orange', 9),
 ('banana', 2),
 ('banana', 4)]

In [13]:
#transform: join
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
result_rdd = x.join(y)
result_rdd.collect()

                                                                                

[('hadoop', (4, 5)), ('spark', (1, 2))]

In [14]:
#transform: sort/sort by key
data = [('apple', 15), ('orange', 22), ('banana', 12)]
result_rdd = sc.parallelize(data).sortBy(lambda x: x[1])
result_rdd.collect()

[('banana', 12), ('apple', 15), ('orange', 22)]

In [15]:
result_rdd = sc.parallelize(data).sortBy(lambda x: x[0], False)
result_rdd.collect()

[('orange', 22), ('banana', 12), ('apple', 15)]

In [16]:
#transform: keys/values
data = [('ahmed', 'math'), ('ahmed', 'physics'), ('mostafa', 'math'), ('said', 'math'), ('salme', 'history'), ('fady', 'math')]
rdd = sc.parallelize(data)

In [17]:
rdd.keys().collect()

['ahmed', 'ahmed', 'mostafa', 'said', 'salme', 'fady']

In [18]:
rdd.values().collect()

['math', 'physics', 'math', 'math', 'history', 'math']

In [19]:
#transform: operations
rdd1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
rdd2 = sc.parallelize([("a", 3), ("c", None)])
result_rdd = rdd1.subtract(rdd2)
result_rdd.collect()
result_rdd = rdd1.subtractByKey(rdd2)
result_rdd.collect()

                                                                                

[('b', 4), ('b', 5)]

In [20]:
#transform: group by
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result_rdd = rdd.groupBy(lambda x: x % 2)
result_rdd.toDF().show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|{[1, 1, 3, 5], 0, 4}|
|  0|      {[2, 8], 0, 2}|
+---+--------------------+



In [21]:
#action: count by key
data = [('ahmed', 'math'), ('ahmed', 'physics'), ('mostafa', 'math'), ('said', 'math'), ('salme', 'history'), ('fady', 'math')]
sc.parallelize(data).countByKey()

defaultdict(int, {'ahmed': 2, 'mostafa': 1, 'said': 1, 'salme': 1, 'fady': 1})

In [22]:
#action: statistics
rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
print("sum:", rdd.sum())
print("min:", rdd.min())
print("max:", rdd.max())
print("stdev:", rdd.stdev())
print("variance:", rdd.variance())
print("sampleStdev:", rdd.sampleStdev())
print("sampleVariance:", rdd.sampleVariance())
print("sumApprox:", rdd.sumApprox(1000, 0.95))
#sum till 1000 ms or 0.95 confidence

sum: 60.0
min: 2.0
max: 43.0
stdev: 16.416455159382004
variance: 269.5
sampleStdev: 18.95608961081724
sampleVariance: 359.3333333333333
sumApprox: 60.0


In [23]:
#word count example
#load
lines = sc.textFile("file:/home/msobh/pyspark/data/words.txt")
#transform
words = lines.flatMap(lambda line: line.split(" "))
#transform
map = words.map(lambda word: (word, 1))
#action
f = lambda accumulator, value: accumulator + value
result = map.reduceByKey(f)
#transform
sorted = result.sortBy(lambda x: x[1], False)
#action
df = sorted.toDF(["Word", "Count"]); df.show()

+---------+-----+
|     Word|Count|
+---------+-----+
|      The|    4|
|   Slaves|    3|
|        a|    3|
|       of|    3|
|      are|    3|
|       to|    3|
| servers.|    3|
|  Masters|    3|
|  contain|    2|
|     list|    2|
|   hosts,|    2|
|      one|    2|
|      per|    2|
|    line,|    2|
|     that|    2|
|     host|    2|
|      and|    2|
| NameNode|    2|
|     file|    2|
|Secondary|    2|
+---------+-----+
only showing top 20 rows

