<a href="https://colab.research.google.com/github/a-nagar/python-workshop/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Python Map-Reduce for Parallel and Distributed Computing

First step is to install pyspark, which is useful for cluster computing. We will use it in the future, right now let's learn about map and reducer operations.

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 32.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=b851a76671b48950553d621bff3c530b030deb4f89d6fcb185c413d8a0a76c0a
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
# creating and storing values
x = 1
y = [1, 2, 3]

# Functions
# lambda syntax
(lambda x: 2*x)(2)
x = lambda a : a + 10
x(5)


15

In [None]:
# Named
def addOne(x):
    return x + 1
print(addOne(1)) 


def add(x, y):
  return x + y
add(2, 3)


2


5

In [None]:
# map functions
salaries = [20000, 70000, 40000]
g = lambda x: 2*x
doubled = list(map(g, salaries))	# notice use of map function



In [None]:
def doubleIt(x):
  return 2*x

list(map(doubleIt, salaries))

[40000, 140000, 80000]

In [None]:
list(map(lambda x: x.upper(), ['cat', 'dog', 'cow']))

# filter operator
num_list = [2,3,4,5,6]
odd_num = list(filter(lambda x: x%2!=0 , num_list))
odd_num



[3, 5]

In [None]:
def isOdd(x):
  return bool(x%2) #bool is optional
  
list(map(isOdd, num_list)) 
 
list(filter(isOdd, num_list))

[3, 5]

In [None]:
# List comprehension
num_list = [2,3,4,5,6]
sq_list = [x*x for x in num_list if x%2==0]
sq_list

[4, 16, 36]

In [None]:
# Reduce operator
from functools import reduce
product = reduce((lambda x, y: x * y), [1, 2, 3, 4])

# Above command is equivalent to:
product = 1
list = [1, 2, 3, 4]
for num in list:
    product = product * num

In [None]:
product

24

## Working with key-value pairs

In [None]:
from pyspark.context import SparkContext 
from pyspark.sql import SparkSession
from pyspark import SparkConf

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [None]:
# More examples on key, value pairs
kv = [(1, 2), (3, 4), (3, 6)]
rdd = sc.parallelize(kv)
rdd.reduceByKey(lambda x, y: x + y)
rdd.groupByKey().map(lambda x : (x[0], list(x[1])))
# sort by key
rdd.sortByKey().collect()

[(1, 2), (3, 4), (3, 6)]

In [None]:
# sort by value
rdd.sortBy(lambda x: x[1]).collect()

[(1, 2), (3, 4), (3, 6)]

In [None]:
rdd.keys().collect()

[1, 3, 3]

In [None]:
rdd.values().collect()

[2, 4, 6]

In [41]:
!wget "https://www.gutenberg.org/files/1661/1661-0.txt"

--2022-07-21 13:04:53--  https://www.gutenberg.org/files/1661/1661-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 607430 (593K) [text/plain]
Saving to: ‘1661-0.txt.1’


2022-07-21 13:04:54 (1.15 MB/s) - ‘1661-0.txt.1’ saved [607430/607430]



In [43]:
input = sc.textFile("1661-0.txt") # input
input.collect()

['The Project Gutenberg eBook of The Adventures of Sherlock Holmes, by Arthur Conan Doyle',
 '',
 'This eBook is for the use of anyone anywhere in the United States and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever. You may copy it, give it away or re-use it under the terms',
 'of the Project Gutenberg License included with this eBook or online at',
 'www.gutenberg.org. If you are not located in the United States, you',
 'will have to check the laws of the country where you are located before',
 'using this eBook.',
 '',
 'Title: The Adventures of Sherlock Holmes',
 '',
 'Author: Arthur Conan Doyle',
 '',
 'Release Date: November 29, 2002 [eBook #1661]',
 '[Most recently updated: May 20, 2019]',
 '',
 'Language: English',
 '',
 'Character set encoding: UTF-8',
 '',
 'Produced by: an anonymous Project Gutenberg volunteer and Jose Menendez',
 '',
 '*** START OF THE PROJECT GUTENBERG EBOOK THE ADVENTURES OF SHERLOCK HOLMES ***',
 '',
 'cover',

In [44]:
words = input.flatMap(lambda x: x.split(" ")).map(lambda x: x.lower())
longWords = words.filter(lambda x: len(x) > 5)
wordPairs = longWords.map(lambda x: (x, 1))
wordCounts = wordPairs.reduceByKey(lambda x,y: x + y).sortBy(lambda x: -x[1])

In [45]:
wordCounts.collect()

[('little', 257),
 ('should', 208),
 ('holmes', 200),
 ('before', 141),
 ('holmes,', 126),
 ('sherlock', 100),
 ('however,', 97),
 ('nothing', 87),
 ('through', 86),
 ('project', 84),
 ('without', 80),
 ('holmes.', 78),
 ('matter', 75),
 ('“well,', 67),
 ('having', 67),
 ('seemed', 65),
 ('thought', 64),
 ('himself', 61),
 ('though', 60),
 ('rather', 60),
 ('something', 60),
 ('between', 58),
 ('always', 55),
 ('gutenberg-tm', 54),
 ('myself', 53),
 ('against', 52),
 ('looked', 51),
 ('hardly', 50),
 ('morning', 50),
 ('turned', 50),
 ('cannot', 50),
 ('friend', 49),
 ('business', 49),
 ('brought', 49),
 ('within', 48),
 ('father', 47),
 ('anything', 46),
 ('already', 44),
 ('behind', 43),
 ('enough', 41),
 ('better', 41),
 ('whether', 41),
 ('looking', 41),
 ('really', 41),
 ('watson,', 40),
 ('strange', 40),
 ('then?”', 39),
 ('passed', 38),
 ('perhaps', 38),
 ('during', 37),
 ('suddenly', 36),
 ('across', 35),
 ('“there', 34),
 ('yourself', 33),
 ('asked.', 33),
 ('rushed', 33),
 ('