# Scalable Machine Learning on Big Data using Apache Spark

This notebook is designed to run in a IBM Watson Studio default runtime (NOT the Watson Studio Apache Spark Runtime as the default runtime with 1 vCPU is free of charge). Therefore, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In case you are facing issues, please read the following two documents first:

https://github.com/IBM/skillsnetwork/wiki/Environment-Setup

https://github.com/IBM/skillsnetwork/wiki/FAQ

Then, please feel free to ask:

https://coursera.org/learn/machine-learning-big-data-apache-spark/discussions/all

Please make sure to follow the guidelines before asking a question:

https://github.com/IBM/skillsnetwork/wiki/FAQ#im-feeling-lost-and-confused-please-help-me

If running outside Watson Studio, this should work as well. In case you are running in an Apache Spark context outside Watson Studio, please remove the Apache Spark setup in the first notebook cells.

## Part 1

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200901025206-0000
KERNEL_ID = 2e4aa8fe-9a3f-42a9-9f8a-5d31cac522c0


# <span style="color:red"><<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>></span>

In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K    100% |████████████████████████████████| 217.8MB 94kB/s  eta 0:00:01  | 171.4MB 59.7MB/s eta 0:00:01    87% |████████████████████████████    | 190.4MB 31.6MB/s eta 0:00:01    93% |█████████████████████████████▉  | 202.7MB 16.0MB/s eta 0:00:01
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.5)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K    100% |████████████████████████████████| 204kB 6.8MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/spark/shared/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing c

In [3]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [4]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [5]:
rdd = sc.parallelize(range(100))

In [7]:
# please replace $$ with the correct characters
# rdd.c$$$t()
rdd.count()

100

You should see "100" as answer. Now we want to know the sum of all elements. Please again, have a look at the API documentation and complete the code below in order to get the sum.

In [8]:
# rdd.s$$()
rdd.sum()

4950

You should get "4950" as answer.

## Part 2

Welcome to exercise two of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise you’ll apply the basics of functional and parallel programming.

Again, please use the following two links for your reference: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD https://spark.apache.org/docs/latest/rdd-programming-guide.html

Let’s actually create a python function which decides whether a value is greater than 50 (True) or not (False).

In [9]:
def gt50(i):
    if i > 50:
        return True
    else:
        return False

In [10]:
print(gt50(4))
print(gt50(51))

False
True


Let’s simplify this function

In [11]:
def gt50(i):
    return i > 50

In [12]:
print(gt50(4))
print(gt50(51))

False
True


Now let’s use the lambda notation to define the function.

In [13]:
gt50 = lambda i: i > 50

In [14]:
print(gt50(4))
print(gt50(51))

False
True


Let's shuffle our list to make it a bit more interesting

In [15]:
from random import shuffle
l = list(range(100))
shuffle(l)
rdd = sc.parallelize(l)

Let’s filter values from our list which are equals or less than 50 by applying our “gt50” function to the list using the “filter” function. Note that by calling the “collect” function, all elements are returned to the Apache Spark Driver. This is not a good idea for BigData, please use “.sample(10,0.1).collect()” or “take(n)” instead.

In [16]:
rdd.filter(gt50).collect()

[85,
 64,
 67,
 55,
 97,
 54,
 82,
 74,
 52,
 69,
 71,
 62,
 87,
 68,
 80,
 72,
 98,
 86,
 58,
 84,
 92,
 95,
 51,
 61,
 59,
 93,
 88,
 83,
 79,
 81,
 99,
 56,
 77,
 90,
 57,
 75,
 89,
 78,
 53,
 94,
 76,
 65,
 73,
 60,
 63,
 70,
 66,
 96,
 91]

We can also use the lambda function directly.

In [17]:
rdd.filter(lambda i: i > 50).collect()

[85,
 64,
 67,
 55,
 97,
 54,
 82,
 74,
 52,
 69,
 71,
 62,
 87,
 68,
 80,
 72,
 98,
 86,
 58,
 84,
 92,
 95,
 51,
 61,
 59,
 93,
 88,
 83,
 79,
 81,
 99,
 56,
 77,
 90,
 57,
 75,
 89,
 78,
 53,
 94,
 76,
 65,
 73,
 60,
 63,
 70,
 66,
 96,
 91]

Let’s consider the same list of integers. Now we want to compute the sum for elements in that list which are greater than 50 but less than 75. Please implement the missing parts.

In [19]:
# rdd.filter(lambda x: $$).filter(lambda x: $$).$$()
rdd.filter(lambda x: x > 50).filter(lambda x: x < 75).sum()

1500

You should see "1500" as answer. Now we want to know the sum of all elements. Please again, have a look at the API documentation and complete the code below in order to get the sum.

## Part 3

Welcome to exercise three of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise you’ll create a DataFrame, register a temporary query table and issue SQL commands against it.

Let’s create a little data frame:

In [20]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=1, value='value1'),Row(id=2, value='value2')])

# let's have a look what's inside
df.show()

# let's print the schema
df.printSchema()

+---+------+
| id| value|
+---+------+
|  1|value1|
|  2|value2|
+---+------+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)



Now we register this DataFrame as query table and issue an SQL statement against it. Please note that the result of the SQL execution returns a new DataFrame we can work with.

In [21]:
# register dataframe as query table
df.createOrReplaceTempView('df_view')

# execute SQL query
df_result = spark.sql('select value from df_view where id=2')

# examine contents of result
df_result.show()

# get result as string
df_result.first().value

+------+
| value|
+------+
|value2|
+------+



'value2'

Although we’ll learn more about DataFrames next week, please try to find a way to count the rows in this DataFrame by looking at the API documentation. No worries, we’ll cover DataFrames in more detail next week.

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [25]:
# df.$$$
df.count()

2