# Connecting Google Drive to Colab

Notebook created by Rosa Filgueira (https://github.com/rosafilgueira/Seminar_MUIA), materials adapted by Daniel Garijo

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Installing Apache Spark

In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=21f42b161b7732134d1289a69006ccd88c4272a63230afca582c743737416a9f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
!ls

drive  sample_data  spark-3.3.2-bin-hadoop3  spark-3.3.2-bin-hadoop3.tgz


In [4]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.18
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.


In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()
findspark.find()

'/content/spark-3.3.2-bin-hadoop3'

# Creating an Apache Spark session

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [7]:
spark

In [8]:
!ls

drive  sample_data  spark-3.3.2-bin-hadoop3  spark-3.3.2-bin-hadoop3.tgz


# Creating my first RDDs  (following the exercise in class)

A simple way to create an RDD is to take an existing collection and load it into Spark by using the SparkContext's parallelize() method. We first start by creating a list of integers using Python's xrange() method. Following this, we create our first RDD by using the parallelize() method to load the list of numbers unto 8 partitions.

In [11]:
# Create a list of one hundred integers
numbers = range(1, 101)

sc = spark.sparkContext

# Create an RDD by dividing the list unto 8 partitions
numbersRDD = sc.parallelize(numbers, 8)

Each RDD has a unique identifier.

In [12]:
# Display the id of the RDD
numbersRDD.id()

1

A name can be set to provide a more meaningful way of identifying an RDD.

In [13]:
# Set the name of the RDD
numbersRDD.setName('Range of integers')

# Print the name of the RDD
print(numbersRDD.name())

Range of integers


### Caching RDDs and simple actions
Since we will be reusing the RDD many times, we ask Spark to cache the RDD in memory

In [14]:
# Cache the RDD
numbersRDD.cache()

Range of integers PythonRDD[1] at RDD at PythonRDD.scala:53

For small datasets, we can use collect() to retrieve and view the entire RDD.

In [15]:


# Retrieve all the elements in the RDD to the driver program
print(numbersRDD.collect())



[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]


Finally, let's verify that our RDD contains one hundred elements.

In [16]:
# Count the number of elements in the RDD
numbersRDD.count()

100

### Simple transformations and actions
(3a) Element-wise transformation using map

We first look at map(), a transformation that applies a function to each element in the RDD. In this exercise, complete the function addOne, which increases an integer element by one. Following this, call map() on numbersRDD supplying the function addOne(). Notice how transformations do not mutate RDDs. Instead, they form new RDDs.


In [None]:
# EXERCISE Replace <FILL IN> with the proper code

def addOne(number):
    """ Increases a number by one
    Args:
        number (int): an integer to increase
    Returns:
        int: the number increased by one
    """
    return <FILL IN>

numbersIncreasedRDD = numbersRDD.map(<FILL IN>)

# RDDs are immutable
print("The id of numbersRDD is:", numbersRDD.id())
print("The id of numbersIncreasedRDD is: ", numbersIncreasedRDD.id())
# Verify that the range of numbers have been increased by one
print("The RDD contains the numbers:", numbersIncreasedRDD.collect())

### Lambda statements

Next, repeat the same transformation, this time by supplying a lambda statement to map(). Lambda statements provide a convenient way of expressing short functions without defining a function body. A lambda statement takes a number of parameters and an expression, creating a function that returns the value of the expression: lambda parameters : expression(parameters)

Next, repeat the transformation in (3a) using a lambda statement.

In [None]:
# Replace <FILL IN> with the proper code

# Increases each element by one using a lambda function
numbersIncreasedRDD = numbersRDD.map(<FILL IN>)

# Verify that the range of numbers have been increased by one
print(numbersIncreasedRDD.collect())



### Additional transformations

Very often it is desirable to remove erroneous elements or elements not required for the desired calculations. filter(), takes a function and retains the elements satisfying the supplied function. Next, try filtering out all the elements not evenly divisible by 2 using the filter() transformation together with a lambda function. Supply filter() with a lambda function that returns True for every input divisible by 2 and False otherwise.


In [None]:
# Replace <FILL IN> with the a lambda function

# Filters out all elements not evenly divisible by 2
filteredNumbersRDD = numbersRDD.filter(<FILL IN>)

# Print all elements evenly divisible by 2
print(filteredNumbersRDD.collect())

In [None]:


# Test
assert filteredNumbersRDD.count() == 50, "The number of filtered elements is wrong!"



Some functions, such as range(), return lists of elements. When applied to individual elements in an RDD, these will create a nested structure, which depending on the application may be undesirable. In these cases, flatMap() can be useful in 'flattening' the resulting structure.


In [None]:
nestedRDD = sc.parallelize([1,2,3])

print(nestedRDD.map(lambda x:range(x)).collect())
print(nestedRDD.flatMap(lambda x:range(x)).collect())

Consider the difference between using map() and flatMap(). Notice how the output from map() contains nested lists, while the output from flatMap() has been "flattened" to a single list.


### Actions

reduce() is a common action, which takes a function that operates on two elements and returns a new element of the same type. A common operation is to sum up the elements in an RDD using reduce(). Sum up the elements in the numbersRDD dataset. Lambda statements having more than one input element can be expressed as: lambda x1, x2, x3, ... : expression(x1, x2, x3, ...)


In [None]:
# Replace <FILL IN> with the proper code

# Sum up the elements in numbersRDD
numbersSum = numbersRDD.reduce(<FILL IN>)

In [None]:
# Test
assert numbersSum == 5050, "The sum is incorrect!"

In addition to using collect(), Spark provides a number of actions to retrieve a limited set of results.


In [None]:
print(numbersRDD.take(5))
print(numbersRDD.first())
print(numbersRDD.top(5))

While the results from take(), first(), and top() differ from one run to another, takeOrdered() returns results in a deterministic way. takeOrdered() by default returns results in natural order. Additionally, a function may be supplied to change the ordering as desired. For instance, to a list of numbers in descending order, the numbers can simply be negated by a lambda function.


In [None]:
# Replace <FILL IN> with the proper code

# Print the numbers in natural order
print(numbersRDD.takeOrdered(5))

# Supply a lambda function to return the elements in reversed order
print(numbersRDD.takeOrdered(5 , <FILL IN>))


### Chaining expressions

Since transformations return new RDDs, it is possible to chain several calls of operations together to form a pipeline. E.g. it is possible to express such a chain as: RDD.transformation1().transformation2().action(). Below we show two ways of chaining, both ways perform the same operations and provide a more readable code.

In [None]:
numbersFiltered = numbersRDD.map(lambda x : x + 1).filter(lambda x : x < 10).collect()


numbersFiltered = (numbersRDD
                   .map(lambda x : x + 1)
                   .filter(lambda x : x < 10)
                   .collect())

print(numbersFiltered)

### Now going back to the class example:

In [None]:
file_path="./sample_data/"
sc = spark.sparkContext
textFile = sc.textFile(file_path+"README.md")
textFile.count()

19

In [None]:
pythonlines = textFile.filter(lambda line:"csv" in line)
pythonlines

In [None]:
pythonlines.count()

# WordCount
In this final part of the exercise, we load a text file into Spark. We perform a simple tokenization of the text, splitting up lines to words. We remove punctuations, normalize the words, and remove empty elements to form an RDD of words.

In [None]:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
output=wordCounts.collect()
for (word, count) in output:
  print("%s: %i" % (word, count))

directory: 1
datasets: 1
*: 3
`california_housing_data*.csv`: 1
is: 4
housing: 1
1990: 1
more: 1
at:: 2
https://developers.google.com/machine-learning/crash-course/california-housing-data-description: 1
`mnist_*.csv`: 1
of: 2
[Anscombe's: 1
was: 2
originally: 1
in: 2
Anscombe,: 1
F.: 1
'Graphs: 1
American: 1
Statistician.: 1
(1):: 1
prepared: 1
This: 1
includes: 1
a: 3
few: 1
sample: 2
to: 1
get: 1
you: 1
started.: 1
California: 1
data: 1
from: 1
the: 3
US: 1
Census;: 1
information: 1
available: 1
small: 1
[MNIST: 1
database](https://en.wikipedia.org/wiki/MNIST_database),: 1
which: 1
described: 2
http://yann.lecun.com/exdb/mnist/: 1
`anscombe.json`: 1
contains: 1
copy: 2
quartet](https://en.wikipedia.org/wiki/Anscombe%27s_quartet);: 1
it: 1
J.: 1
(1973).: 1
Statistical: 1
Analysis'.: 1
27: 1
17-21.: 1
JSTOR: 1
2682899.: 1
and: 1
our: 1
by: 1
[vega_datasets: 1
library](https://github.com/altair-viz/vega_datasets/blob/4f67bdaad10f45e3549984e17e1b3088c731503d/vega_datasets/_data/anscombe.

### Removing stop words

In many cases when performing text analysis, it is often desirable to remove common words called 'stop words' such as 'the', 'a', and 'is'. Define a lambda function and apply a transformation that filters out the five stop words: 'the', 'and', 'i', 'to', and 'of'.

In [None]:
# Replace <FILL IN> with the proper code

filteredWordCount = wordCount.filter(<FILL IN>)

print(filteredWordCount.count())

print(filteredWordCount.take(30))