Checking Java Version

In [None]:
!java -version

openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)


Setting Java 8 environment 

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Downloading Spark

In [None]:
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

Extracting Spark Files

In [None]:
!tar xf spark-3.0.1-bin-hadoop3.2.tgz

Installing FindSpark

In [None]:
!pip install -q findspark

Setting up Home environment

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

Creating Spark Session

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Stopping the session

In [None]:
spark.stop()

Check the pyspark version

In [None]:
import pyspark
print(pyspark.__version__)

3.0.1


# First Spark Job

Importing Pyspark

In [None]:
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('local')
conf.setAppName('spark-basis')
sc = SparkContext(conf=conf)

Function to calculate mod

In [None]:
def mod(x):
  import numpy as np
  return (x, np.mod(x, 2))

Creating an RDD

In [None]:
rdd = sc.parallelize(range(1000)).map(mod).take(10)
print(rdd)

[(0, 0), (1, 1), (2, 0), (3, 1), (4, 0), (5, 1), (6, 0), (7, 1), (8, 0), (9, 1)]


Creating an RDD using list

In [None]:
values = [1,2,3,4,5]
rdd = sc.parallelize(values)

Printing all the 5 elements of RDD

In [None]:
 rdd.take(5)

[1, 2, 3, 4, 5]

Uploading Files to Colab

In [None]:
from google.colab import files
uploaded = files.upload()

Saving Spark.txt to Spark.txt


Loading a text file to Spark

In [None]:
rdd = sc.textFile("Spark.txt")

Print the rdd data

In [None]:
rdd.collect()

["Hi I'm Trishla ", 'How are you?', 'What are you doin?']

RDD Persistence

In [None]:
aba = sc.parallelize(range(1, 10000, 2))
aba.persist()

PythonRDD[7] at RDD at PythonRDD.scala:53

RDD Caching

In [None]:
textFile = sc.textFile("Spark.txt")
textFile.cache()

Spark.txt MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:0

Map

In [None]:
x = sc.parallelize(["spark", "rdd", "example", "sample", "example"])
y = x.map(lambda x:(x, 1))
y.collect()

[('spark', 1), ('rdd', 1), ('example', 1), ('sample', 1), ('example', 1)]

FlatMap

In [None]:
rdd = sc.parallelize([2, 3, 4])
sorted(rdd.flatMap(lambda x: range(1, x)).collect())

[1, 1, 1, 2, 2, 3]

Filter

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x%2==0).collect()

[2, 4]

Sample1

In [None]:
parallel = sc.parallelize(range(9))
parallel.sample(True, .2).count()

1

Sample2

In [None]:
parallel.sample(False, 1).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8]

Union

In [None]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.union(par).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

Intersection

In [None]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.intersection(par).collect()

[6, 8, 5, 7]

Distinct

In [None]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.union(par).distinct().collect()

[2, 4, 6, 8, 10, 12, 14, 1, 3, 5, 7, 9, 11, 13]

SortBy1

In [None]:
y = sc.parallelize([5, 7, 1, 3, 2, 1])
y.sortBy(lambda c: c, True).collect()

[1, 1, 2, 3, 5, 7]

SortBy2

In [None]:
z = sc.parallelize([("H", 10), ("A", 26), ("Z", 1), ("L", 5)])
z.sortBy(lambda c: c, False).collect()

[('Z', 1), ('L', 5), ('H', 10), ('A', 26)]

MapPartitions

In [None]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()

[3, 7]

MapPartitions - WithIndex

In [None]:
rdd = sc.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).sum()

6

GroupBy

In [None]:
rdd = sc.parallelize([1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x%2).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [2, 8]), (1, [1, 3, 5])]

KeyBy

In [None]:
x = sc.parallelize(range(0, 3)).keyBy(lambda x: x*x)
y = sc.parallelize(zip(range(0, 5), range(0, 5)))
[(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]

[(0, [[0], [0]]),
 (1, [[1], [1]]),
 (2, [[], [2]]),
 (3, [[], [3]]),
 (4, [[2], [4]])]

Zip

In [None]:
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

Zip - WithIndex

In [None]:
sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

Repartition


In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
sorted(rdd.glom().collect())

[[1], [2, 3], [4, 5], [6, 7]]

In [None]:
len(rdd.repartition(2).glom().collect())

2

Coalesce

In [None]:
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()

[[1], [2, 3], [4, 5]]

In [None]:
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(2).glom().collect()

[[1], [2, 3, 4, 5]]

Reduce

In [None]:
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)

15

In [None]:
sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)

10

First

In [None]:
sc.parallelize([2, 3, 4]).first()

2

TakeOrdered

In [None]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.takeOrdered(5)

[0, 1, 2, 3, 4]

Take


In [None]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.take(5)

[1, 5, 3, 9, 4]

Count

In [None]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.count()

7

Collect

In [None]:
c = sc.parallelize(["Gun", "Cat", "Rat", "Dog", "Gun", "Rat"], 2)
c.collect()

['Gun', 'Cat', 'Rat', 'Dog', 'Gun', 'Rat']

Distinct

In [None]:
c = sc.parallelize(["Gun", "Cat", "Rat", "Dog", "Gun", "Rat"], 2)
c.distinct().collect()

['Gun', 'Cat', 'Rat', 'Dog']

CollectAsMap

In [None]:
alphanumerics = sc.parallelize([(1, "a"), (2, "b"), (3, "c")])
alphanumerics.collectAsMap()

{1: 'a', 2: 'b', 3: 'c'}

SaveAsTextfile

In [None]:
a = sc.parallelize(range(1, 10000), 3)
a.saveAsTextFile("/usr/bin/mydata_ai")

In [None]:
x = sc.parallelize([1,2,3,4,5,6,6,7,8,9,10,21], 3)
x.saveAsTextFile("/usr/bin/sample3.txt")

Foreach

In [None]:
def f(x): print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

Foreach - Partition

In [None]:
def f(iterator):
  for x in iterator:
    print(x)
  sc.parallelize([1, 2, 3, 4, 5]).foreach()

Mathematical Actions

In [None]:
numbers = sc.parallelize(range(1, 100))

In [None]:
numbers.sum()

4950

In [None]:
numbers.min()

1

In [None]:
numbers.variance()

816.6666666666666

In [None]:
numbers.max()

99

In [None]:
numbers.mean()

50.0

In [None]:
numbers.stdev()

28.577380332470412

CountByValue


In [None]:
a = sc.parallelize([1,2,3,4,5,6,7,8,2,4,2,3,3,3,1,1,1])
a.countByValue()

defaultdict(int, {1: 4, 2: 3, 3: 4, 4: 2, 5: 1, 6: 1, 7: 1, 8: 1})

toDebugString

In [None]:
a = sc.parallelize(range(1, 19), 3)
b = sc.parallelize(range(1, 13), 3)
c = a.subtract(b)
c.toDebugString()

b'(6) PythonRDD[166] at RDD at PythonRDD.scala:53 []\n |  MapPartitionsRDD[165] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[164] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(6) PairwiseRDD[163] at subtract at <ipython-input-105-d303d0f7bae3>:3 []\n    |  PythonRDD[162] at subtract at <ipython-input-105-d303d0f7bae3>:3 []\n    |  UnionRDD[161] at union at NativeMethodAccessorImpl.java:0 []\n    |  PythonRDD[159] at RDD at PythonRDD.scala:53 []\n    |  ParallelCollectionRDD[157] at readRDDFromFile at PythonRDD.scala:262 []\n    |  PythonRDD[160] at RDD at PythonRDD.scala:53 []\n    |  ParallelCollectionRDD[158] at readRDDFromFile at PythonRDD.scala:262 []'

Creating Pair RDDs

In [None]:
rdd = sc.parallelize([("a1", "b1", "c1", "d1", "e1"), ("a2","b2","c2","d2","e2")])
result = rdd.map(lambda x: (x[0], list(x[1:])))
result.collect()

[('a1', ['b1', 'c1', 'd1', 'e1']), ('a2', ['b2', 'c2', 'd2', 'e2'])]

WordCount using RDD concepts

In [None]:
rdd = sc.textFile("Spark.txt")

In [None]:
nonempty_lines = rdd.filter(lambda x: len(x) > 0)

In [None]:
words = nonempty_lines.flatMap(lambda x: x.split(' '))

In [None]:
wordcount = words.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).map(lambda x:(x[1], x[0])).sortByKey(False)

In [None]:
for word in wordcount.collect():
  print(word)

(2, 'are')
(1, 'Hi')
(1, "I'm")
(1, 'Trishla')
(1, '')
(1, 'How')
(1, 'you?')
(1, 'What')
(1, 'you')
(1, 'doin?')


In [None]:
wordcount.saveAsTextFile("/content/Wordcount")

Passing Functions to Spark

In [None]:
rdd = sc.parallelize([1,2,3,4,5])
rdd.map(lambda x: x+2).collect()

[3, 4, 5, 6, 7]

Anonymous Function (i.e., one with no name)
- Here this returns a given integer plus 2.

In [None]:
lambda x: x+2

<function __main__.<lambda>>