<a href="https://colab.research.google.com/github/ChetanKnowIt/pyspark-colab/blob/main/pyspark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=742e343084a6d6beaa7f275baa28f6b090b026da5c57350fe37a6ba098cb11e1
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Read data from text file:

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:

spark = SparkSession.builder.master("local[4]")\
										.appName('chetan')\
										.getOrCreate()

In [None]:
sc = spark.sparkContext

In [None]:
rdd = sc.textFile('fruits.txt,fruits1.txt')
rdd.collect()

['Apple Orange Mango',
 'Orange Grapes Plum',
 'Apple Plum Mango',
 'Apple Apple Plum',
 'Pineapple Apple Mango',
 'Mango Banana Berry',
 'Cherry Mango Apple',
 'Banana Apple Cherry']

# Transformation functions:

### flatMap

In [None]:
rdd2 = rdd.flatMap(lambda x: x.split())
rdd2.collect()

['Apple',
 'Orange',
 'Mango',
 'Orange',
 'Grapes',
 'Plum',
 'Apple',
 'Plum',
 'Mango',
 'Apple',
 'Apple',
 'Plum',
 'Pineapple',
 'Apple',
 'Mango',
 'Mango',
 'Banana',
 'Berry',
 'Cherry',
 'Mango',
 'Apple',
 'Banana',
 'Apple',
 'Cherry']

### map

In [None]:
rdd3 = rdd2.map(lambda x: (x,1))
rdd3.collect()

[('Apple', 1),
 ('Orange', 1),
 ('Mango', 1),
 ('Orange', 1),
 ('Grapes', 1),
 ('Plum', 1),
 ('Apple', 1),
 ('Plum', 1),
 ('Mango', 1),
 ('Apple', 1),
 ('Apple', 1),
 ('Plum', 1),
 ('Pineapple', 1),
 ('Apple', 1),
 ('Mango', 1),
 ('Mango', 1),
 ('Banana', 1),
 ('Berry', 1),
 ('Cherry', 1),
 ('Mango', 1),
 ('Apple', 1),
 ('Banana', 1),
 ('Apple', 1),
 ('Cherry', 1)]

### reduceByKey

In [None]:
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)
rdd4.collect()

[('Apple', 7),
 ('Orange', 2),
 ('Pineapple', 1),
 ('Berry', 1),
 ('Cherry', 2),
 ('Mango', 5),
 ('Grapes', 1),
 ('Plum', 3),
 ('Banana', 2)]

### sortByKey

In [None]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
rdd5.collect()

[(1, 'Pineapple'),
 (1, 'Berry'),
 (1, 'Grapes'),
 (2, 'Orange'),
 (2, 'Cherry'),
 (2, 'Banana'),
 (3, 'Plum'),
 (5, 'Mango'),
 (7, 'Apple')]

In [None]:
rdd5 = rdd4.map(lambda x: (x[0],x[1])).sortByKey()
rdd5.collect()

[('Apple', 7),
 ('Banana', 2),
 ('Berry', 1),
 ('Cherry', 2),
 ('Grapes', 1),
 ('Mango', 5),
 ('Orange', 2),
 ('Pineapple', 1),
 ('Plum', 3)]

### filter 

In [None]:
rdd6 = rdd3.filter(lambda x: x[0].startswith('B'))
rdd6.collect()

[('Banana', 1), ('Berry', 1), ('Banana', 1)]

# Word Count in PySpark

In [None]:
text_file = sc.textFile("fruits.txt")
counts = text_file.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word,1))\
                  .reduceByKey(lambda x,y: x+y)
counts.collect()                 

[('Apple', 4), ('Orange', 2), ('Mango', 2), ('Grapes', 1), ('Plum', 3)]

In [None]:
for (word,count) in counts.collect():
  print("%s:%i" % (word,count))

Apple:4
Orange:2
Mango:2
Grapes:1
Plum:3


# Average of Ages in PySpark

In [None]:
ages_text_file = sc.textFile("ages.txt")
counts = ages_text_file.flatMap(lambda line: line.split())\
                  .filter(lambda x: x.isdigit())\
                  .map(lambda word:(1,int(word)))
counts.collect()                 

[(1, 21), (1, 22), (1, 24), (1, 22), (1, 25), (1, 27), (1, 29), (1, 20)]

In [None]:
def avg(rdd):
  add =0; cnt = 0
  for x,y in rdd.collect():
    add += y
    cnt += 1
  return(sc.parallelize([add/cnt]))

In [None]:
avg_age = avg(counts)
avg_age.collect()

[23.75]

# basic rdd operations

In [None]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8])

In [None]:
print("Count: "+str(rdd.count()))

Count: 8


In [None]:
firstRec = rdd.first()
print("First Record: "+str(firstRec))

First Record: 1


In [None]:
print("Maximum: ", rdd.max())

Maximum:  8


In [None]:
print("Take 3: ", rdd.take(3))

Take 3:  [1, 2, 3]


In [None]:
rdd.sum()

36

In [None]:
rdd.mean()

4.5