In [22]:
# import pyspark 
import pyspark as ps
from pyspark import SparkContext

In [23]:
# create a spark context 
# sc = SparkContext ('local', 'Lecture_Example') # local must be written small

### Creating RDDs 

In [24]:
# create a list of items 
data = [('emp1', 2000),('emp2', 2500),('emp3', 3000)]
rdd = sc.parallelize(data) # this is how to create a RDD from data. 

# also we can create a text file and read our data from  using the textFile method 
data_file = sc.textFile('data.txt')

### printing RDDs 

In [25]:
# to get all the elements in the RDD, we use the method colect 
arr_data = rdd.collect()
file_data = data_file.collect() 

# now we can print them 
print(arr_data, file_data, sep='\n')

[('emp1', 2000), ('emp2', 2500), ('emp3', 3000)]
['emp1, 2000', 'emp2, 2500', 'emp3, 3000']


In [26]:
# iterating over items 
for item in arr_data: 
    print(item)

for item in file_data: 
    print(item)

('emp1', 2000)
('emp2', 2500)
('emp3', 3000)
emp1, 2000
emp2, 2500
emp3, 3000


In [27]:
for e,s in arr_data: 
    print(e,s)

# this is how to preprocess the text into tuples 
counter = 0
for item in file_data: 
    e,s = item.split(',')
    item = (e,s)
    file_data[counter] = item
    counter += 1 

for e,s in file_data: 
    print(e,s)

print(file_data)

emp1 2000
emp2 2500
emp3 3000
emp1  2000
emp2  2500
emp3  3000
[('emp1', ' 2000'), ('emp2', ' 2500'), ('emp3', ' 3000')]


### RDDs Transformations 

In [32]:
# now lets test the map methods. 
# the .map(), returns a new RDD, which is created by applying a function over each item in the passed RDD.
string_data = [("PySpark is Python API"),("PySpark supports Spark’s features")]
rdd_string = sc.parallelize(string_data) 
# rdd_string = rdd_string.collect() 
print(rdd_string)

def splitString (string: str): 
    return string.split(' ')

rdd2 = rdd_string.map(splitString)
print(rdd2.collect()) # collect converts the RDD into list.

ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:289
[['PySpark', 'is', 'Python', 'API'], ['PySpark', 'supports', 'Spark’s', 'features']]


In [33]:
# now I want to understand the flatmap 
rdd3 = rdd_string.flatMap(splitString) # it converts the ND dimensional array into 1d array -> zy el kona bn3mlu fl pc. 
print(rdd3.collect())

['PySpark', 'is', 'Python', 'API', 'PySpark', 'supports', 'Spark’s', 'features']


In [34]:
# now lets apply the reducebykey method 
newData = [("key1", 2000), ("key1", 2500), ("key2", 3000)]
newDataRdd = sc.parallelize(newData) # converting them to RDD. 
reducedRDD = newDataRdd.reduceByKey(lambda x,y: x + y)
rddToList = reducedRDD.collect() 
print(rddToList)

[('key1', 4500), ('key2', 3000)]


In [35]:
# we can apply filtering operation 
filtering = rdd_string.filter(lambda x: 'API' in x)
print(filtering.collect())

['PySpark is Python API']


In [37]:
data=[("PySpark is Python API", 10),("PySpark supports Spark’s features", 15)]
rdd=sc.parallelize(data)
rdd2 = rdd.filter(lambda x : 'API' in x[0]) # gets the first tuple only
rdd3 = rdd.filter(lambda x : x[1]>10) # gets the second tuple only. 
print(rdd2.collect())
print(rdd3.collect())


[('PySpark is Python API', 10)]
[('PySpark supports Spark’s features', 15)]


### RDDs Actions 

In [40]:
# we use the reduce method to apply some aggregation function on our data 
# so if we want to get the sum for example, we should do the following 
data = [1,2,3,4,5] # sum = 5 * 6 / 2 = 15 ;)

# convert the list into RDD 
rdd_data = sc.parallelize(data)

# create the logic of sum 
def sumElm (x1: int, x2: int): 
    return x1 + x2

# apply the reduce 
sum = rdd_data.reduce(sumElm)

# here we do not collect the result. 
print(sum)

15


In [41]:
# get the max 
def getMax (x1: int, x2: int):
    return max(x1, x2)

max = rdd_data.reduce(getMax)

print(max)

5


In [49]:
# sorting elements 
unordered=[(1,20),(2,40)]
ordered  = sc.parallelize(unordered )
#Sort by keys (ascending):
print(ordered .takeOrdered(5, key = lambda x: x[0]))

#Sort by keys (descending):
print(ordered .takeOrdered(5, key = lambda x: -x[0]))

#Sort by values (ascending):
print(ordered .takeOrdered(5, key = lambda x: x[1]))

#Sort by values (descending):
print(ordered .takeOrdered(5, key = lambda x: -x[1]))



[(1, 20), (2, 40)]
[(2, 40), (1, 20)]
[(1, 20), (2, 40)]
[(2, 40), (1, 20)]
