In [10]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()



In [11]:
rdd = sc.parallelize([1,2,3]) 
#parellelize(): a new distributed data set is created with specified number of partitions and 
#the elements of the collection are copied to the RDD.
rdd.map(lambda x: x*x).take(3)

[1, 4, 9]

In [9]:
numbers = range(5)
print(list(numbers))

rdd = sc.parallelize(numbers)

def square(a):
    return [a,a*a,a*a*a]

newRDD = rdd.map(lambda x: square(x))
newArray = newRDD.collect()
print(newArray)

[0, 1, 2, 3, 4]
[[0, 0, 0], [1, 1, 1], [2, 4, 8], [3, 9, 27], [4, 16, 64]]


In [12]:
pointsRdd = sc.textFile("datasets/points.txt")

def splitAndMakeTuple(line):
    arr = line.split(",")
    x = int(arr[0])
    y = int(arr[1])
    return x,y

print(pointsRdd.collect()) 
#collect(): gathered the splitted data
newpointsRdd = pointsRdd.map(lambda x : splitAndMakeTuple(x))
print("Before Sorted: ")
print(newpointsRdd.collect())
print("After Sorted: ")
sortedRdd = newpointsRdd.sortBy(lambda arr : arr[1], ascending = True )
print(sortedRdd.collect())


['5,5', '15,17', '15,16', '5,6', '6,6', '16,16', '7,7', '7,6', '17,17', '6,7', '7,8', '18,18', '8,6', '15,18', '5,8', '17,16', '8,8', '16,17', '17,18', '18,16']
Before Sorted: 
[(5, 5), (15, 17), (15, 16), (5, 6), (6, 6), (16, 16), (7, 7), (7, 6), (17, 17), (6, 7), (7, 8), (18, 18), (8, 6), (15, 18), (5, 8), (17, 16), (8, 8), (16, 17), (17, 18), (18, 16)]
After Sorted: 
[(5, 5), (5, 6), (6, 6), (7, 6), (8, 6), (7, 7), (6, 7), (7, 8), (5, 8), (8, 8), (15, 16), (16, 16), (17, 16), (18, 16), (15, 17), (17, 17), (16, 17), (18, 18), (15, 18), (17, 18)]


In [17]:
worldRDD = sc.textFile("datasets/world.txt")

def splitAndMakeTuple(line):
    arr = line.split(",")
    country = arr[0]
    population = int(arr[3])
    return(country,population)

print(worldRDD.take(3))
worldRdd = worldRDD.map(lambda line: splitAndMakeTuple(line))
print("Before Sorted: ")
print(worldRdd.take(10))
print("After Sorted: ")
sortedWorld = worldRdd.sortBy(lambda arr: arr[1],ascending = False)
print(sortedWorld.take(10))

['Afghanistan,Asia,Kabul,29863010', 'Albania,Europe,Tirana,3129678', 'Algeria,Africa,Algiers,32853800']
Before Sorted: 
[('Afghanistan', 29863010), ('Albania', 3129678), ('Algeria', 32853800), ('Andorra', 67151), ('Angola', 15941390), ('Antigua and Barbuda', 81479), ('Argentina', 38747150), ('Armenia', 3016312), ('Australia', 20155130), ('Austria', 8189444)]
After Sorted: 
[("China People's Republic of", 1315844000), ('India', 1103371000), ('United States', 298212900), ('Indonesia', 222781500), ('Brazil', 186404900), ('Pakistan', 157935100), ('Russia', 143201600), ('Bangladesh', 141822300), ('Nigeria', 131529700), ('Japan', 128084700)]


In [18]:
#Reduce

rdd1 = sc.parallelize([4,3,6,7])
result = rdd1.reduce(lambda x,y: x+y )
print(result)

20


In [21]:
#ReduceByKey

rdd2 = sc.parallelize([('a',1),('b',2),('a',3),('a',4),('b',10),('c',5),('c',8)])
result2 = rdd2.reduceByKey(lambda x,y: x+y)
print(result2.collect())

[('b', 12), ('c', 13), ('a', 8)]


In [23]:
#Transformations

pointsRdd = sc.textFile("datasets/points.txt")

def splitAndMakeTuple(line):
    arr = line.split(",")
    number1 = int(arr[0])
    number2 = int(arr[1])
    return (number1,number2)

pointsRdd = pointsRdd.map(lambda x: splitAndMakeTuple(x))
#print(pointsRdd.take(20))

pointsRdd = pointsRdd.filter(lambda a: (a[0] + a[1]) > 15 )
pointsRdd = pointsRdd.sortBy(lambda c: c[0], ascending = True)

print(pointsRdd.take(50))

[(8, 8), (15, 17), (15, 16), (15, 18), (16, 16), (16, 17), (17, 17), (17, 16), (17, 18), (18, 18), (18, 16)]


In [24]:
eaRdd = sc.textFile("datasets/DepremVerileri-2019-Nisan.txt")
eaRDD = eaRdd.filter(lambda line: "Deprem Kodu" not in line)
#firstly, perform filtering to get better results
def splitAndSelect(line):
    arr = line.split("\t")
    date = arr[2]
    xM = float(arr[7])
    place = arr[14]
    return (place,date,xM)
eaRdd = eaRDD.map(lambda x: splitAndSelect(x))
eaRdd = eaRdd.sortBy(lambda arr: arr[2], ascending = False)

print(eaRdd.take(10))


[('KURUTILEK- (ERZINCAN) [North East  3.0 km]', '1939.12.26', 7.9), ('ONIKI ADALAR (AKDENIZ)', '1926.06.26', 7.7), ('T�RKIYE-IRAN SINIR B�LGESI', '1930.05.06', 7.6), ('YENIYAKA-CALDIRAN (VAN) [South East  1.9 km]', '1976.11.24', 7.5), ('BASISKELE (KOCAELI) [North East  2.0 km]', '1999.08.17', 7.4), ('ERIKLICE-SARKOY (TEKIRDAG) [South East  4.3 km]', '1912.08.09', 7.3), ('YEMLICE- (VAN) [North West  1.5 km]', '2011.10.23', 7.2), ('UGUR- (DUZCE) [North East  0.3 km]', '1999.11.12', 7.2), ('SOGUCAK-YENICE (�ANAKKALE) [South West  2.3 km]', '1953.03.18', 7.2), ('AKDENIZ', '1948.02.09', 7.2)]


In [26]:
earthRdd = sc.textFile("datasets/DepremVerileri-2019-Nisan.txt")
earthRdd = earthRdd.filter(lambda line: "Deprem Kodu" not in line)

def splitAndSelect(line):
    arr = line.split("\t")
    long = float(arr[4])
    lat = float(arr[5])
    return (long,lat)

earthRdd = earthRdd.map(lambda x: splitAndSelect(x))

def findArea(x):
    long = x[0]
    lat = x[1]
    area = "Area1"
    
    if (long > 39):
        if (lat <= 31):
            area = "Area1"
        elif(lat <= 36):
            area = "Area2"
        elif (lat <= 41):
            area = "Area3"
        else:
            area = "Area4"
    else:
        if (lat <= 31):
            area = "Area5"
        elif(lat <= 36):
            area = "Area6"
        elif (lat <= 41):
            area = "Area7"
        else:
            area = "Area8"
    return (area,1)

earthRdd = earthRdd.map(lambda x: findArea(x))
earthRdd = earthRdd.reduceByKey(lambda x,y: x+y)

earthRdd = earthRdd.sortBy(lambda a: a[1], ascending = False)
#earthRdd.saveAsTextFile("acb")

print(earthRdd.take(10))
    
    

[('Area5', 7504), ('Area1', 2628), ('Area8', 1450), ('Area6', 1233), ('Area7', 1172), ('Area2', 959), ('Area3', 932), ('Area4', 718)]


In [29]:
#FlatMap

rdd3 = sc.textFile("datasets/text.txt")
rdd3 = rdd3.flatMap(lambda x: x.split(" "))
print(rdd3.take(500))

['Sabancı', 'Üniversitesi,', 'Bilişim', 'Teknolojileri', '(BT)', 'Yüksek', 'Lisans', 'Programı', '', '(Professional', "Masters'", 'Degree', 'in', 'IT)', 'sektöre', 'nitelikli', 'iş', 'gücü', 'kazandırma', '', 'misyonu', 'ile', '2001', 'yılında', 'çalışmalarına', 'başlamış', 've', '2002', 'yılı', 'Güz', 'döneminde', '', 'eğitime', 'başlamıştır.', 'Program,', 'hızla', 'değişen', 'endüstri', 'gereksinimlerine', 'cevap', '', 'vermek', 'üzere', 'katılımcılarına', 'teknik', 'altyapı,', 'çözüm', 'üretme', 'yeteneği', 've', '', 'rekabet', 'gücü,', 'endüstriye', 'ise', 'bu', 'becerilere', 'sahip', 'profesyoneller', 'kazandırmayı', '', 'hedeflemektedir.', 'Eğitim', 'programının', 'tasarımında', 'endüstri', 'beklentileri', 've', 'yeni', '', 'teknolojiler', 'dikkate', 'alınarak', 'teorik', 'bilgilerin', 'uygulamalarla', 'desteklendiği,', '', 'laboratuvar', 'çalışmalarıyla', 'zenginleştirilmiş', 'bir', 'içerik', 've', 'format', 'benimsenmiştir.']


In [35]:
#Union

rdd4 = sc.parallelize([1,1,1])
rdd5 = sc.parallelize([2,2,2])
print(rdd4.union(rdd5).take(5))

#Join

rdd6 = sc.parallelize([("a",1),("b",1),("c",1),("d",1)])
rdd7 = sc.parallelize([("a",2),("b",2),("c",2),("e",2)])
print(rdd6.join(rdd7).take(10))

#Cartesian

print(rdd6.cartesian(rdd7).take(10))
print("-------")
print(rdd6.cartesian(rdd7).map(lambda x: (x[0][0],x[0][1],x[1][0],x[1][1])).take(10))


[1, 1, 1, 2, 2]
[('a', (1, 2)), ('b', (1, 2)), ('c', (1, 2))]
[(('a', 1), ('a', 2)), (('a', 1), ('b', 2)), (('a', 1), ('c', 2)), (('a', 1), ('e', 2)), (('b', 1), ('a', 2)), (('b', 1), ('b', 2)), (('b', 1), ('c', 2)), (('b', 1), ('e', 2)), (('c', 1), ('a', 2)), (('c', 1), ('b', 2))]
-------
[('a', 1, 'a', 2), ('a', 1, 'b', 2), ('a', 1, 'c', 2), ('a', 1, 'e', 2), ('b', 1, 'a', 2), ('b', 1, 'b', 2), ('b', 1, 'c', 2), ('b', 1, 'e', 2), ('c', 1, 'a', 2), ('c', 1, 'b', 2)]


In [37]:

#Intersection

rdd1 = sc.parallelize([("a",1),("b",1),("c",1),("d",1)])
rdd2 = sc.parallelize([("a",2),("b",2),("c",1),("e",2)])

print(rdd1.intersection(rdd2).take(10))

#GroupByKey

rdd3 = sc.parallelize([("a",1),("b",1),("c",1),("d",1),("a",5),("b",3)])
print(rdd3.groupByKey().map(lambda x: (x[0],list(x[1]))).take(10))

[('c', 1)]
[('b', [1, 3]), ('c', [1]), ('a', [1, 5]), ('d', [1])]


In [41]:
#Word Count

rdd=sc.textFile("datasets/JamesJoyce-Ulyses.txt")
rdd = rdd.filter(lambda line: len(line)>10)
print(rdd.count())

rdd = rdd.map(lambda line: line.replace(',','')
             .replace('.','').replace('[','')
             .replace('!','').replace(']','')
             .replace('#','').replace('{','')
             .replace('-','').replace('}',''))
rdd = rdd.map(lambda line: line.lower())
rdd = rdd.flatMap(lambda line: line.split(" "))
rdd = rdd.map(lambda x: (x,1))
rdd = rdd.reduceByKey(lambda x,y: x+y)
rdd = rdd.sortBy(lambda x : x[1],ascending=False)
print(rdd.take(100))


24652
[('the', 14946), ('of', 8239), ('and', 7251), ('a', 6470), ('to', 4991), ('in', 4888), ('he', 3778), ('his', 3295), ('i', 2658), ('that', 2554), ('with', 2507), ('', 2443), ('it', 2256), ('was', 2116), ('on', 2098), ('for', 1931), ('you', 1880), ('her', 1735), ('him', 1466), ('is', 1436), ('all', 1316), ('at', 1291), ('by', 1282), ('as', 1179), ('said', 1176), ('from', 1084), ('she', 1052), ('or', 1022), ('they', 1004), ('me', 899), ('be', 886), ('out', 880), ('not', 880), ('my', 826), ('what', 826), ('had', 812), ('up', 802), ('like', 723), ('their', 714), ('mr', 712), ('have', 695), ('but', 691), ('one', 682), ('there', 675), ('an', 654), ('them', 649), ('no', 628), ('so', 597), ('bloom', 589), ('then', 560), ('are', 549), ('if', 548), ('when', 544), ('about', 535), ('which', 509), ('were', 505), ('your', 490), ('old', 484), ('this', 482), ('says', 469), ('who', 463), ('down', 442), ('do', 436), ('over', 436), ('too', 427), ('we', 424), ('after', 421), ('now', 408), ('see', 407