In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

In [3]:
sparkConf = SparkConf()\
.setMaster('local[4]')\
.setAppName('RDD')\
.setExecutorEnv('spark.executor.memory', '4g')\
.setExecutorEnv('spark.driver.memory', '4g')


sc = SparkContext(conf = sparkConf)

## 🔍***PairRDD Transformations:***

---



---





### **📌filter()**





In [4]:
age = [('Alex', 35), ('Anna', 26), ('Max', 42)]

age_rdd = sc.parallelize(age)
age_rdd.filter(lambda key_value : key_value[1] < 30).collect()

[('Anna', 26)]

In [5]:
age_rdd.filter(lambda key_value : key_value[0] == 'Max').collect()

[('Max', 42)]

### **📌reduceByKey()**

In [6]:
rdd = sc.parallelize([(1,2), (3,4), (3,6)])

rdd.reduceByKey(lambda x,y : x+y).collect()

[(1, 2), (3, 10)]

### **📌groupByKey()**

In [7]:
rdd.groupByKey().collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x7f3a9ed89950>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f3a9a92d750>)]

### **📌combineByKey()**

In [8]:
result = rdd.combineByKey((lambda value : (value,1)),(lambda acc, value: (acc[0]+ value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
result.collect()

[(1, (2, 1)), (3, (10, 2))]

### **📌mapValues()**

In [9]:
rdd.mapValues(lambda x:x*100).collect()

[(1, 200), (3, 400), (3, 600)]

In [10]:
rdd.keys().collect()

[1, 3, 3]

In [11]:
rdd.values().collect()

[2, 4, 6]

### **📌sortByKey()**

In [12]:
rdd.sortByKey().collect()

[(1, 2), (3, 4), (3, 6)]

### **📌subtractByKey()**


In [13]:
rdd2 = sc.parallelize([(3,9)])
rdd.subtractByKey(rdd2).collect()

[(1, 2)]

### **📌join()**

In [14]:
#sadece ortak keyi olanlari join edecek!
rdd.join(rdd2).collect()

[(3, (4, 9)), (3, (6, 9))]

### **📌rightOuterJoin()**

In [15]:
rdd.rightOuterJoin(rdd2).collect()

[(3, (4, 9)), (3, (6, 9))]

### **📌leftOuterJoin()**

In [16]:
rdd.leftOuterJoin(rdd2).collect()

[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

### **📌cogroup()**

In [17]:
rdd.cogroup(rdd2).collect()

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x7f3a9a953b90>,
   <pyspark.resultiterable.ResultIterable at 0x7f3aa16f9390>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x7f3a9a93d090>,
   <pyspark.resultiterable.ResultIterable at 0x7f3a9a9537d0>))]

## ***⏳RDD Filter and Map Transformation***

In [18]:
retailRdd = sc.textFile('/content/OnlineRetail.csv')

In [19]:
retailRdd.take(5)

['InvoiceNo;StockCode;Description;Quantity;InvoiceDate;UnitPrice;CustomerID;Country',
 '536365;85123A;WHITE HANGING HEART T-LIGHT HOLDER;6;1.12.2010 08:26;2,55;17850;United Kingdom',
 '536365;71053;WHITE METAL LANTERN;6;1.12.2010 08:26;3,39;17850;United Kingdom',
 '536365;84406B;CREAM CUPID HEARTS COAT HANGER;8;1.12.2010 08:26;2,75;17850;United Kingdom',
 '536365;84029G;KNITTED UNION FLAG HOT WATER BOTTLE;6;1.12.2010 08:26;3,39;17850;United Kingdom']

***`without header`***

In [20]:
firstline = retailRdd.first()
firstlinerdd = sc.parallelize([firstline])
firstlinerdd.collect()

['InvoiceNo;StockCode;Description;Quantity;InvoiceDate;UnitPrice;CustomerID;Country']

In [21]:
 retail_Rdd = retailRdd.subtract(firstlinerdd)
 retail_Rdd.take(2)

['536367;84969;BOX OF 6 ASSORTED COLOUR TEASPOONS;6;1.12.2010 08:34;4,25;13047;United Kingdom',
 '536369;21756;BATH BUILDING BLOCK WORD;3;1.12.2010 08:35;5,95;13047;United Kingdom']

***Let's filter orders with InvoiceNo=536367***



In [22]:
retail_Rdd.map(lambda x : x.split(';')).filter(lambda line : line[0] == '536367').take(1)

[['536367',
  '84969',
  'BOX OF 6 ASSORTED COLOUR TEASPOONS',
  '6',
  '1.12.2010 08:34',
  '4,25',
  '13047',
  'United Kingdom']]

***Let's filter the unit price higher than 30***

In [23]:
retail_Rdd.map(lambda x : x.split(';')).filter(lambda line : int(line[3]) > 30).take(2)

[['536378',
  '85183B',
  'CHARLIE & LOLA WASTEPAPER BIN FLORA',
  '48',
  '1.12.2010 09:37',
  '1,25',
  '14688',
  'United Kingdom'],
 ['536381',
  '22719',
  'GUMBALL MONOCHROME COAT RACK',
  '36',
  '1.12.2010 09:41',
  '1,06',
  '15311',
  'United Kingdom']]

In [24]:
retail_Rdd.filter(lambda line : int(line.split(";")[3]) >30).take(2)



['536378;85183B;CHARLIE & LOLA WASTEPAPER BIN FLORA;48;1.12.2010 09:37;1,25;14688;United Kingdom',
 '536381;22719;GUMBALL MONOCHROME COAT RACK;36;1.12.2010 09:41;1,06;15311;United Kingdom']

***Filter out COFFEE in description***

In [25]:
retail_Rdd.filter(lambda line :'COFFEE' in line.split(";")[2]).take(2)

['536739;85159A;BLACK TEA,COFFEE,SUGAR JARS;2;2.12.2010 13:08;6,35;14180;United Kingdom',
 '536750;37370;RETRO COFFEE MUGS ASSORTED;6;2.12.2010 14:04;1,06;17850;United Kingdom']

***filter with function***

In [26]:
def QuantandDescrip(x):
  quantity = int(x.split(';')[3])
  description = x.split(';')[2]

  return (quantity > 2000) & ('PAPER' in description)

In [27]:
retail_Rdd.filter(lambda x : QuantandDescrip(x)).take(8)

['578841;84826;ASSTD DESIGN 3D PAPER STICKERS;12540;25.11.2011 15:57;0;13256;United Kingdom',
 '581483;23843;PAPER CRAFT , LITTLE BIRDIE;80995;9.12.2011 09:15;2,08;16446;United Kingdom']

***sum of canceled sales***

In [28]:
def canceled_sales(line):
  is_canceled = True if (line.split(';')[0].startswith('C')) else False
  quantity = float(line.split(";")[3])
  price = float(line.split(';')[5].replace(',','.'))
  total = quantity * price

  return (is_canceled, total)

In [29]:
retailTotal = retail_Rdd.map(canceled_sales).filter(lambda x:x[0]==True)
retailTotal.take(5)

[(True, -6.959999999999999),
 (True, -6.959999999999999),
 (True, -1.7),
 (True, -7.8),
 (True, -8.25)]

In [30]:
reducedTotal = retailTotal.reduceByKey(lambda x,y:x+y)

In [31]:
reducedTotal.map(lambda x: x[1]).take(2)

[-896812.4899999979]

***Calculate average salary by occupation***

In [32]:
#sirano,isim,yas,meslek,sehir,aylik_gelir

rdd_3 = sc.parallelize([
                        
(1,'Cemal',35,'Isci','Ankara',3500),
(2,'Ceyda',42,'Memur','Kayseri',4200),
(3,'Timur',30,'Müzisyen','Istanbul',9000),
(4,'Burcu',29,'Pazarlamaci','Ankara',4200),
(5,'Yasemin',23,'Pazarlamaci','Bursa',4800),
(6,'Ali',33,'Memur','Ankara',4250),
(7,'Dilek',29,'Pazarlamaci','Istanbul',7300),
(8,'Murat',31,'Müzisyen','Istanbul',12000),
(9,'Ahmet',33,'Doktor','Ankara',18000),
(10,'Muhittin',46,'Berber','Istanbul',12000),
(11,'Hicaziye',47,'Tuhafiyeci','Ankara',4800),
(12,'Harun',43,'Tornacı','Ankara',4200),
(13,'Hakkı',33,'Memur','Çorum',3750),
(14,'Gülizar',37,'Doktor','İzmir',14250),
(15,'Şehmuz',41,'Müzisyen','Ankara',8700)

])

In [33]:
rdd_3.collect()

[(1, 'Cemal', 35, 'Isci', 'Ankara', 3500),
 (2, 'Ceyda', 42, 'Memur', 'Kayseri', 4200),
 (3, 'Timur', 30, 'Müzisyen', 'Istanbul', 9000),
 (4, 'Burcu', 29, 'Pazarlamaci', 'Ankara', 4200),
 (5, 'Yasemin', 23, 'Pazarlamaci', 'Bursa', 4800),
 (6, 'Ali', 33, 'Memur', 'Ankara', 4250),
 (7, 'Dilek', 29, 'Pazarlamaci', 'Istanbul', 7300),
 (8, 'Murat', 31, 'Müzisyen', 'Istanbul', 12000),
 (9, 'Ahmet', 33, 'Doktor', 'Ankara', 18000),
 (10, 'Muhittin', 46, 'Berber', 'Istanbul', 12000),
 (11, 'Hicaziye', 47, 'Tuhafiyeci', 'Ankara', 4800),
 (12, 'Harun', 43, 'Tornacı', 'Ankara', 4200),
 (13, 'Hakkı', 33, 'Memur', 'Çorum', 3750),
 (14, 'Gülizar', 37, 'Doktor', 'İzmir', 14250),
 (15, 'Şehmuz', 41, 'Müzisyen', 'Ankara', 8700)]

In [34]:
def avg_salary(x):
    meslek = x[3]
    maas = float(x[5])
    
    return (meslek,maas)

In [35]:
avg_salary_pairRDD = rdd_3.map(avg_salary)

In [36]:
avg_salary_pairRDD.take(3)

[('Isci', 3500.0), ('Memur', 4200.0), ('Müzisyen', 9000.0)]

In [37]:
avg_salary = avg_salary_pairRDD.mapValues(lambda x: (x,1))

In [38]:
avg_salary.take(5)

[('Isci', (3500.0, 1)),
 ('Memur', (4200.0, 1)),
 ('Müzisyen', (9000.0, 1)),
 ('Pazarlamaci', (4200.0, 1)),
 ('Pazarlamaci', (4800.0, 1))]

In [39]:
avg_salary_reduce =avg_salary.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1]))

In [40]:
avg_salary_reduce.take(5)

[('Tornacı', (4200.0, 1)),
 ('Doktor', (32250.0, 2)),
 ('Berber', (12000.0, 1)),
 ('Memur', (12200.0, 3)),
 ('Pazarlamaci', (16300.0, 3))]

In [41]:
average = avg_salary_reduce.mapValues(lambda x: x[0] / x[1]) 
average.collect()

[('Tornacı', 4200.0),
 ('Doktor', 16125.0),
 ('Berber', 12000.0),
 ('Memur', 4066.6666666666665),
 ('Pazarlamaci', 5433.333333333333),
 ('Tuhafiyeci', 4800.0),
 ('Isci', 3500.0),
 ('Müzisyen', 9900.0)]

## ***RDD Join***

In [42]:
order_items_rdd = sc.textFile('/content/order_items.csv')\
.filter(lambda x: 'orderItemName' not in x)\
.repartition(4)

In [43]:
order_items_rdd.take(5)

['11,5,1014,2,99.96,49.98',
 '12,5,957,1,299.98,299.98',
 '13,5,403,1,129.99,129.99',
 '14,7,1073,1,199.99,199.99',
 '15,7,957,1,299.98,299.98']

In [44]:
product_rdd = sc.textFile('/content/products.csv')\
.filter(lambda x: 'productCategoryId' not in x)\
.repartition(4)

In [45]:
product_rdd.take(5)

['11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set',
 "12,2,Under Armour Men's Highlight MC Alter Ego Fla,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Flash+Football...",
 "13,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat",
 '14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy',
 "15,2,Under Armour Kids' Highlight RM Alter Ego Sup,,59.99,http://images.acmesports.sports/Under+Armour+Kids%27+Highlight+RM+Alter+Ego+Superman+Football..."]

In [46]:
# order_items pair_rdd yapma
def make_order_items_pair_rdd(line):
    orderItemName = line.split(",")[0]
    orderItemOrderId = line.split(",")[1]
    orderItemProductId = line.split(",")[2]
    orderItemQuantity = line.split(",")[3]
    orderItemSubTotal = line.split(",")[4]
    orderItemProductPrice = line.split(",")[5]
    
    return (orderItemProductId, (orderItemName, orderItemOrderId, orderItemQuantity, 
                                 orderItemSubTotal,orderItemProductPrice))

In [47]:
order_item_pair_rdd = order_items_rdd.map(make_order_items_pair_rdd)
order_item_pair_rdd.take(5)

[('1014', ('11', '5', '2', '99.96', '49.98')),
 ('957', ('12', '5', '1', '299.98', '299.98')),
 ('403', ('13', '5', '1', '129.99', '129.99')),
 ('1073', ('14', '7', '1', '199.99', '199.99')),
 ('957', ('15', '7', '1', '299.98', '299.98'))]

In [48]:
# products için pair rdd yapma
def make_products_pair_rdd(line):
    productId = line.split(",")[0]
    productCategoryId = line.split(",")[1]
    productName = line.split(",")[2]
    productDescription = line.split(",")[3]
    productPrice = line.split(",")[4]
    productImage = line.split(",")[5]
    
    return (productId,(productCategoryId, productName, productDescription, productPrice, productImage))

In [49]:
products_pair_rdd = product_rdd.map(make_products_pair_rdd)
products_pair_rdd.take(3)

[('11',
  ('2',
   'Fitness Gear 300 lb Olympic Weight Set',
   '',
   '209.99',
   'http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set')),
 ('12',
  ('2',
   "Under Armour Men's Highlight MC Alter Ego Fla",
   '',
   '139.99',
   'http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Flash+Football...')),
 ('13',
  ('2',
   "Under Armour Men's Renegade D Mid Football Cl",
   '',
   '89.99',
   'http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat'))]

In [50]:
order_items_product_pair_rdd = order_item_pair_rdd.join(products_pair_rdd)

In [51]:
order_items_product_pair_rdd.take(1)

[('957',
  (('12', '5', '1', '299.98', '299.98'),
   ('43',
    "Diamondback Women's Serene Classic Comfort Bi",
    '',
    '299.98',
    'http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014')))]