# tek RDD Basic Operations

In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

In [4]:
pyspark = SparkSession.builder \
.master("local[4]") \
.appName("rdd-ollusturma") \
.config("spark.executor.memory", "4g")\
.config("spark.driver.memory","2g")\
.getOrCreate()

In [5]:
sc = pyspark.sparkContext

In [2]:
liste = [1,2,3,4,5]

In [7]:
liste_rdd = sc.parallelize(liste)

In [8]:
liste_rdd.take(10)

[1, 2, 3, 4, 5]

In [9]:
liste_rdd.map(lambda x: x*x).take(4)

[1, 4, 9, 16]

In [12]:
liste_rdd.filter(lambda x: x == 4).take(5)

[4]

In [13]:
metin = ["mergaba emel" ,"ali ata bak", "ahmet tpu bırak"]

In [14]:
metin_rdd = sc.parallelize(metin)

In [15]:
metin_rdd.take(3)

['mergaba emel', 'ali ata bak', 'ahmet tpu bırak']

In [18]:
metin_rdd.map(lambda x: x.upper()).take(3)

['MERGABA EMEL', 'ALI ATA BAK', 'AHMET TPU BIRAK']

In [19]:
metin_rdd.flatMap(lambda x: x.upper()).take(3)

['M', 'E', 'R']

In [20]:
metin_rdd.flatMap(lambda x: x.split(" ")).map(lambda x: x.upper()).take(10)

['MERGABA', 'EMEL', 'ALI', 'ATA', 'BAK', 'AHMET', 'TPU', 'BIRAK']

In [23]:
liste_rdd.distinct()

PythonRDD[25] at RDD at PythonRDD.scala:53

In [24]:
# tekrarlı olanlar gitti
liste_rdd.distinct().take(4)

[4, 1, 5, 2]

In [25]:
# sample aldıktan sonra aldığımı geri bırakim mi? True(evet)
# fraction: ne kadarlık bir sample alsın: 0.5 yarısı
liste_rdd.sample(True, 0.5, 42).take(4)

[4]

In [27]:
liste_rdd.sample(True, 0.7, 42).take(4)

[2, 3, 4, 4]

In [29]:
liste_rdd.sample(True, 0.9, 42).take(4)

[2, 3, 4, 4]

# iki rdd basic transformations

In [31]:
rdd1 = sc.parallelize([1,2,9,5,4,36])

In [32]:
rdd2 = sc.parallelize([1,4,9,16,25,36])

In [33]:
rdd1.take(3)

[1, 2, 9]

In [34]:
rdd2.take(3)

[1, 4, 9]

In [36]:
rdd1.union(rdd2).take(12)

[1, 2, 9, 5, 4, 36, 1, 4, 9, 16, 25, 36]

In [38]:
rdd1.intersection(rdd2).take(10)

[1, 9, 4, 36]

In [39]:
rdd1.subtract(rdd2).take(10)

[2, 5]

In [40]:
rdd1.cartesian(rdd2).take(50)

[(1, 1),
 (1, 4),
 (1, 9),
 (1, 16),
 (1, 25),
 (1, 36),
 (2, 1),
 (9, 1),
 (2, 4),
 (2, 9),
 (9, 4),
 (9, 9),
 (2, 16),
 (9, 16),
 (2, 25),
 (2, 36),
 (9, 25),
 (9, 36),
 (5, 1),
 (5, 4),
 (5, 9),
 (5, 16),
 (5, 25),
 (5, 36),
 (4, 1),
 (36, 1),
 (4, 4),
 (4, 9),
 (36, 4),
 (36, 9),
 (4, 16),
 (36, 16),
 (4, 25),
 (4, 36),
 (36, 25),
 (36, 36)]

# tek rdd üzerinde basic operations


In [43]:
rdd = sc.parallelize([1,2,9,4,2,4,5,1,1,7])

In [44]:
rdd.collect()

[1, 2, 9, 4, 2, 4, 5, 1, 1, 7]

In [46]:
#metinlerde satı sayısını verir
rdd.count()

10

In [47]:
# tekrar edenleri ve sayılarını getirir
rdd.countByValue()

defaultdict(int, {1: 3, 2: 2, 9: 1, 4: 2, 5: 1, 7: 1})

In [48]:
# bunda nereden getireceği belli değil ama verilen sayı kadar getirir
rdd.take(3)

[1, 2, 9]

In [49]:
# üstten getirir
rdd.top(4)

[9, 7, 5, 4]

In [50]:
#sıralayıp getirir
rdd.takeOrdered(10)

[1, 1, 1, 2, 2, 4, 4, 5, 7, 9]

In [52]:
#rdd den sample bir sample alıp drivera getirir,sample() da rdd dönüyordu, sonucusu seed
rdd.takeSample(False, 5, 33)

[1, 2, 4, 2, 1]

In [53]:
rdd.takeSample(False, 5, 33)

[1, 2, 4, 2, 1]

In [55]:
rdd.reduce(lambda x,y: x+y)

36

In [56]:
rdd.fold(0, lambda x,y: x+y)


36

In [57]:
# acc-> accumulater(x), value(y) : toplamı veriyor ama bu toplamın kaç rakamdan oluştuğunuda veriyor
# 
rdd.aggregate( (0,0), (lambda acc, value: ( acc[0] +value, acc[1]+1)), ( lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])))

(36, 10)