# ***Running Pyspark in Colab***

To run spark in Colab, first we need to install all the dependencies in Colab environment such as Apache Spark 2.4.4 with hadoop 2.7, Java 8 and Findspark in order to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. Follow the steps to install the dependencies:

In [0]:
spark

# The SparkSession
* Spark Application is controlled through a driver process called the SparkSession.
*The SparkSession instance is the way Spark executes user-defined manipulations across the cluster.
*There is a one-to-one correspondence between a SparkSession and a Spark Application.
*In Scala and Python, the variable is available as spark when you start the console.

* When you start Spark in the interactive mode, you implicitly create a SparkSession that manages the Spark Application.

* When you start it through a standalone application, you must create the SparkSession object yourself in your application code





# Resilient Distributed Datasets (RDDs)

* There are two sets of low-level APIs:
    * For manipulating distributed data (RDDs)
    *   For distributing and manipulating distributed shared variables
        * broadcast variables
        * accumulators
* All Spark workloads compile down to these fundamental primitives
* A SparkContext is the entry point for low-level API functionality.
* SparkContext can be accessed through the SparkSession, which is the tool you use to perform computation across a Spark cluster








## RDD

* Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel.
* An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program


## There are three ways to create an RDD in Spark.

* Parallelizing already existing collection in driver program.
* Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).
* Creating RDD from already existing RDDs.

In [7]:
sc = spark.sparkContext
sc

In [0]:
data_range = range(1,101)

In [5]:
type(data_range)

range

In [0]:
data_RDD = sc.parallelize(data_range,10)

In [0]:
data_RDD.take(5)

[1, 2, 3, 4, 5]

In [0]:
data_RDD.getNumPartitions()

10

In [0]:
data_pair = [("maths",52),("english",75),("science",82), ("computer",65),("maths",85)]

In [0]:
type(data_pair)

list

In [0]:
rdd1 = sc.parallelize(data_pair)

In [0]:
rdd1.collect()

[('maths', 52),
 ('english', 75),
 ('science', 82),
 ('computer', 65),
 ('maths', 85)]

In [0]:
from google.colab import files

In [0]:
files.upload()

Saving temp_data.txt to temp_data.txt


{'temp_data.txt': b'1901\t-78\t1\n1901\t-72\t1\n1901\t-94\t1\n1901\t-61\t1\n1901\t-56\t1\n1901\t-28\t1\n1901\t-67\t1\n1901\t-33\t1\n1901\t-28\t1\n1901\t-33\t1\n1901\t-44\t1\n1901\t-39\t1\n1901\t0\t1\n1901\t6\t1\n1901\t0\t1\n1901\t6\t1\n1901\t6\t1\n1901\t-11\t1\n1901\t-33\t1\n1901\t-50\t1\n1901\t-44\t1\n1901\t-28\t1\n1901\t-33\t1\n1901\t-33\t1\n1901\t-50\t1\n1901\t-33\t1\n1901\t-28\t1\n1901\t-44\t1\n1901\t-44\t1\n1901\t-44\t1\n1901\t-39\t1\n1901\t-50\t1\n1901\t-44\t1\n1901\t-39\t1\n1901\t-33\t1\n1901\t-22\t1\n1901\t0\t1\n1901\t-6\t1\n1901\t-17\t1\n1901\t-44\t1\n1901\t-39\t1\n1901\t-33\t1\n1901\t-6\t1\n1901\t17\t1\n1901\t22\t1\n1901\t28\t1\n1901\t28\t1\n1901\t11\t1\n1901\t-17\t1\n1901\t-28\t1\n1901\t-56\t1\n1901\t-44\t1\n1901\t-44\t1\n1901\t-67\t1\n1901\t-44\t1\n1901\t-39\t1\n1901\t-22\t1\n1901\t-22\t1\n1901\t-22\t1\n1901\t-39\t1\n1901\t-17\t1\n1901\t-17\t1\n1901\t-22\t1\n1901\t-17\t1\n1901\t-6\t1\n1901\t-6\t1\n1901\t0\t1\n1901\t11\t1\n1901\t0\t1\n1901\t-6\t1\n1901\t17\t1\n1901\t6\t1\n19

In [0]:
!ls temp_data.txt

temp_data.txt


In [0]:
rdd2 = sc.textFile("temp_data.txt") 

In [0]:
rdd2.take(3)

['1901\t-78\t1', '1901\t-72\t1', '1901\t-94\t1']

In [0]:
rdd3 = rdd2.map(lambda s : s.split('\t'))

In [0]:
rdd3.take(3)

[['1901', '-78', '1'], ['1901', '-72', '1'], ['1901', '-94', '1']]

In [0]:
rdd3.getNumPartitions()

2

## RDDs support two types of operations:

* Transformations are operations (such as map, filter, join, union, and so on) that are performed on an RDD and which yield a new RDD containing the result
* Actions are operations (such as reduce, count, first, and so on) that return a value after running a computation on an RDD
* Transformations in Spark are “lazy”, meaning that they do not compute their results right away
* They just “remember” the operation to be performed and the dataset (e.g., file) to which the operation is to be performed.
* The transformations are only actually computed when an action is called and the result is returned to the driver program.
* This design enables Spark to run more efficiently. For example, if a big file was transformed in various ways and passed to first action, Spark would only process and return the result for the first line, rather than do the work for the entire file.

## Transformations

* coalesce() - Return a new RDD that is reduced into numPartitions partitions.
* glom() - Return an RDD created by coalescing all elements within each partition into an array

In [0]:
RDD = sc.parallelize(range(30), 5)

In [12]:
RDD.glom().collect()

[[0, 1, 2, 3, 4, 5],
 [6, 7, 8, 9, 10, 11],
 [12, 13, 14, 15, 16, 17],
 [18, 19, 20, 21, 22, 23],
 [24, 25, 26, 27, 28, 29]]

In [0]:
RDD.getNumPartitions()

5

In [0]:
RDD = RDD.coalesce(2)

In [14]:
RDD.glom().collect()

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
 [12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]]

In [0]:
RDD_par = RDD.repartition(20)

In [20]:
RDD_par.glom().collect()

[[],
 [0, 1, 2, 3, 4, 5],
 [18, 19, 20, 21, 22, 23],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [],
 [12, 13, 14, 15, 16, 17],
 [6, 7, 8, 9, 10, 11],
 [24, 25, 26, 27, 28, 29],
 [],
 [],
 [],
 [],
 []]

* Note : Internally, repartition uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

In [0]:
intRdd = sc.parallelize([10, 20, 30, 40, 50])

* Map Transformation

In [0]:
mapRDD = intRdd.map(lambda x : x**2)
mapRDD.collect()

[100, 400, 900, 1600, 2500]

* Filter(Transformation):

* The filter operation evaluates a Boolean function for each data item of the RDD and puts the items for which the function returned true into the resulting RDD. Filter is a Transformation. Collect is an Action.

In [0]:
numRdd = sc.parallelize([11,12,13,14,15,16,17,18])
filterRdd1 = numRdd.filter(lambda x : x%2 == 1)
filterRdd1.collect()

[11, 13, 15, 17]

In [0]:
filterRdd2 = numRdd.filter(lambda x : x%2 == 0)
filterRdd2.collect()


[12, 14, 16, 18]

* ReduceByKey (Transformation):
* Spark RDD reduceByKey function merges the values for each key using an associative reduce function. Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements (i.e. RDDs having tuple or Map as a data element).

In [0]:
x = sc.parallelize([("comp", 2), ("tab", 1), ("comp", 1), ("comp", 1),
("tab", 1), ("tab", 1), ("tab", 1), ("tab", 1)])

In [22]:
x.collect()

[('comp', 2),
 ('tab', 1),
 ('comp', 1),
 ('comp', 1),
 ('tab', 1),
 ('tab', 1),
 ('tab', 1),
 ('tab', 1)]

In [23]:
x.reduceByKey(lambda a, b: a + b).collect()

[('comp', 4), ('tab', 5)]

In [0]:
y = x.reduceByKey(lambda a, b: a + b)

In [0]:
y.collect()

[('comp', 4), ('tab', 5)]

* flatMap (Transformation) :
* Spark flatMap function returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [0]:
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

[range(1, 3), range(1, 4), range(1, 5)]

In [0]:
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

[1, 2, 1, 2, 3, 1, 2, 3, 4]

In [0]:
sentRdd = sc.parallelize(["Hello TO BE Data Scientists at INSOFE", "Welcome to Big Data Training.", "We are doing pySpark Activity"])

In [0]:
sentRdd.collect()

['Hello TO BE Data Scientists at INSOFE',
 'Welcome to Big Data Training.',
 'We are doing pySpark Activity']

In [0]:
sentRdd.map(lambda x: x.split(' ')).collect()

[['Hello', 'TO', 'BE', 'Data', 'Scientists', 'at', 'INSOFE'],
 ['Welcome', 'to', 'Big', 'Data', 'Training.'],
 ['We', 'are', 'doing', 'pySpark', 'Activity']]

In [0]:
wordlist = sentRdd.flatMap(lambda x: x.split(' ')).collect()

In [0]:
wordlist


['Hello',
 'TO',
 'BE',
 'Data',
 'Scientists',
 'at',
 'INSOFE',
 'Welcome',
 'to',
 'Big',
 'Data',
 'Training.',
 'We',
 'are',
 'doing',
 'pySpark',
 'Activity']

* groupByKey(Transformation):
* Spark groupByKey function returns a new RDD. The returned RDD gives back an object which allows to iterate over the results. The results of groupByKey returns a list by calling list() on values.

In [0]:
example = sc.parallelize([('x',1), ('x',1), ('y', 1), ('z', 1)])

In [0]:
example.groupByKey().collect()

[('y', <pyspark.resultiterable.ResultIterable at 0x7f801f540710>),
 ('x', <pyspark.resultiterable.ResultIterable at 0x7f801f540780>),
 ('z', <pyspark.resultiterable.ResultIterable at 0x7f801f5407b8>)]

In [0]:
itRdd = example.groupByKey()

In [0]:
itRdd.map(lambda x :(x[0], list(x[1]))).collect()

[('y', [1]), ('x', [1, 1]), ('z', [1])]

* groupBy (Transformation) :
* groupBy function returns an RDD of grouped items. This operation will return the new RDD which basically is made up with a KEY (which is a group) and list of items of that group (in a form of Iterator). Order of element within the group may not same when you apply the same operation on the same RDD over and over.

In [0]:
namesRdd = sc.parallelize(["Joseph", "Jimmy", "Tina","Thomas","James","Cory","Christine", "Jackeline", "Juan"])

In [0]:
namesRdd.collect()

['Joseph',
 'Jimmy',
 'Tina',
 'Thomas',
 'James',
 'Cory',
 'Christine',
 'Jackeline',
 'Juan']

In [0]:
result =namesRdd.groupBy(lambda word: word[0]).collect()

In [0]:
for x in result:
    x = (x[0],sorted(x[1]))
    print(x)

('J', ['Jackeline', 'James', 'Jimmy', 'Joseph', 'Juan'])
('C', ['Christine', 'Cory'])
('T', ['Thomas', 'Tina'])


In [0]:
[(x, sorted(y)) for (x, y) in result]

[('J', ['Jackeline', 'James', 'Jimmy', 'Joseph', 'Juan']),
 ('C', ['Christine', 'Cory']),
 ('T', ['Thomas', 'Tina'])]

* mapValues (Transformation) :
* Apply a function to each value of a pair RDD without changing the key.

In [0]:
namesRdd = sc.parallelize(["dog", "tiger", "lion", "cat", "panther","eagle"])
pairRdd = namesRdd.map(lambda x :(len(x), x))

In [0]:
namesRdd.collect()

['dog', 'tiger', 'lion', 'cat', 'panther', 'eagle']

In [0]:
pairRdd.collect()

[(3, 'dog'),
 (5, 'tiger'),
 (4, 'lion'),
 (3, 'cat'),
 (7, 'panther'),
 (5, 'eagle')]

In [0]:
result = pairRdd.mapValues(lambda y: "Animal name is " + y)
result.collect()

[(3, 'Animal name is dog'),
 (5, 'Animal name is tiger'),
 (4, 'Animal name is lion'),
 (3, 'Animal name is cat'),
 (7, 'Animal name is panther'),
 (5, 'Animal name is eagle')]

In [0]:
rdd1 = sc.parallelize([("red",20),("red",30),("blue", 100)])
rdd2 = sc.parallelize([("red",40),("red",50),("yellow", 10000)])

In [0]:
rdd1.join(rdd2).collect()

[('red', (20, 40)), ('red', (20, 50)), ('red', (30, 40)), ('red', (30, 50))]

In [0]:
rdd1 = sc.parallelize([("Mercedes", "E-Class"), ("Toyota", "Corolla"),("Renault", "Duster")])
rdd2 = sc.parallelize([("Mercedes", "C-Class"), ("Toyota", "Prius"),("Toyota", "Etios"),("Volkswagen", "Polo")])

In [0]:
innerJoinRdd = rdd1.join(rdd2)
innerJoinRdd.collect()

[('Mercedes', ('E-Class', 'C-Class')),
 ('Toyota', ('Corolla', 'Prius')),
 ('Toyota', ('Corolla', 'Etios'))]

In [0]:
outerJoinRdd = rdd1.leftOuterJoin(rdd2)
outerJoinRdd.collect()

[('Mercedes', ('E-Class', 'C-Class')),
 ('Renault', ('Duster', None)),
 ('Toyota', ('Corolla', 'Prius')),
 ('Toyota', ('Corolla', 'Etios'))]

In [0]:
rdd1.rightOuterJoin(rdd2).collect()

[('Volkswagen', (None, 'Polo')),
 ('Mercedes', ('E-Class', 'C-Class')),
 ('Toyota', ('Corolla', 'Prius')),
 ('Toyota', ('Corolla', 'Etios'))]

In [0]:
rdd1.fullOuterJoin(rdd2).collect()

[('Volkswagen', (None, 'Polo')),
 ('Mercedes', ('E-Class', 'C-Class')),
 ('Renault', ('Duster', None)),
 ('Toyota', ('Corolla', 'Prius')),
 ('Toyota', ('Corolla', 'Etios'))]

In [0]:
d1= [('k1', 1), ('k2', 2), ('k3', 5)]
d2= [('k1', 3), ('k2',4), ('k4', 8)]


In [0]:
d1_RDD = sc.parallelize(d1)
d2_RDD = sc.parallelize(d2)

In [0]:
d1_RDD.union(d2_RDD).collect()

[('k1', 1), ('k2', 2), ('k3', 5), ('k1', 3), ('k2', 4), ('k4', 8)]

In [0]:
files.upload()

Saving input.txt to input.txt


{'input.txt': b'2.3.0\nOverview\nProgramming Guides\nAPI Docs\nDeploying\nMore\nSpark Overview\nApache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.\n\nDownloading\nGet Spark from the downloads page of the project website. This documentation is for Spark version 2.3.0. Spark uses Hadoop\xe2\x80\x99s client libraries for HDFS and YARN. Downloads are pre-packaged for a handful of popular Hadoop versions. Users can also download a \xe2\x80\x9cHadoop free\xe2\x80\x9d binary and run Spark with any Hadoop version by augmenting Spark\xe2\x80\x99s classpath. Scala and Java users can include Spark in their projects using its Maven coordinates and in the future Python users c

In [0]:
text_file  = sc.textFile('input.txt')

In [0]:
text_file .take(5)

['2.3.0', 'Overview', 'Programming Guides', 'API Docs', 'Deploying']

In [0]:
x = text_file.flatMap(lambda line: line.split(" "))

In [0]:
word_counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

In [0]:
word_counts.take(20)

[('2.3.0', 2),
 ('Overview', 2),
 ('Programming', 3),
 ('Guides', 1),
 ('Docs', 1),
 ('Spark', 48),
 ('Apache', 3),
 ('is', 4),
 ('general-purpose', 1),
 ('It', 3),
 ('provides', 4),
 ('high-level', 1),
 ('APIs', 2),
 ('in', 12),
 ('Java,', 3),
 ('Scala,', 2),
 ('Python', 8),
 ('an', 2),
 ('optimized', 1),
 ('engine', 1)]

In [0]:
my_collection = "Postgraduate Program in Big Data Analytics and Optimization"\
  .split(" ")
    
words = sc.parallelize(my_collection)

In [0]:
words.collect()

['Postgraduate',
 'Program',
 'in',
 'Big',
 'Data',
 'Analytics',
 'and',
 'Optimization']

In [0]:
supplementalData = {"Postgraduate":1000, "Analytics":200, "Optimization": 400,
                    "Big":-300, "Data": 100, "Program":100}

In [0]:
suppBroadcast = sc.broadcast(supplementalData)

In [0]:
suppBroadcast.value

{'Analytics': 200,
 'Big': -300,
 'Data': 100,
 'Optimization': 400,
 'Postgraduate': 1000,
 'Program': 100}

In [0]:
words.map(lambda word: (word,suppBroadcast.value.get(word))).collect()

[('Postgraduate', 1000),
 ('Program', 100),
 ('in', None),
 ('Big', -300),
 ('Data', 100),
 ('Analytics', 200),
 ('and', None),
 ('Optimization', 400)]

In [0]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0))).collect()

[('Postgraduate', 1000),
 ('Program', 100),
 ('in', 0),
 ('Big', -300),
 ('Data', 100),
 ('Analytics', 200),
 ('and', 0),
 ('Optimization', 400)]

In [0]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
  .sortBy(lambda wordPair: wordPair[1])\
  .collect()

[('Big', -300),
 ('in', 0),
 ('and', 0),
 ('Program', 100),
 ('Data', 100),
 ('Analytics', 200),
 ('Optimization', 400),
 ('Postgraduate', 1000)]

In [0]:
count = sc.accumulator(0)

In [0]:
count.value

0

In [0]:
num = sc.parallelize([1,2,3])

In [0]:
num.foreach(lambda x: count.add(1))

In [0]:
count.value

3