Welcome to exercise one of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll apply the basics of functional and parallel programming. 

Let’s start with a simple example. Let’s consider a list of integers.

Let’s find out what the size of this list is.

In [7]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


# <span style="color:red"><<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>></span>

In [8]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [9]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [11]:
rdd = sc.parallelize(range(100))

In [12]:
rdd.count()

100

You should see "100" as answer. Now we want to know the sum of all elements.

In [6]:
rdd.sum()

4950

You should get "4950" as answer.


Let’s actually create a python function which decides whether a value is greater than 50 (True) or not (False).


In [13]:
def gt50(i):
    if i > 50:
        return True
    else:
        return False

print(gt50(4))
print(gt50(51))

Let’s simplify this function


In [15]:
def gt50(i):
    return i > 50

In [16]:
print(gt50(4))
print(gt50(51))

False
True


Now let’s use the lambda notation to define the function.

In [17]:
gt50 = lambda i: i > 50

In [18]:
print(gt50(4))
print(gt50(51))

False
True


In [19]:
#let's shuffle our list to make it a bit more interesting
from random import shuffle
l = list(range(100))
shuffle(l)
rdd = sc.parallelize(l)

Let’s filter values from our list which are equals or less than 50 by applying our “gt50” function to the list using the “filter” function. Note that by calling the “collect” function, all elements are returned to the Apache Spark Driver. This is not a good idea for BigData, please use “.sample(10,0.1).collect()” or “take(n)” instead.


In [20]:
rdd.filter(gt50).collect()

[96,
 86,
 77,
 71,
 94,
 81,
 97,
 68,
 59,
 89,
 79,
 88,
 58,
 92,
 98,
 54,
 75,
 51,
 64,
 67,
 53,
 69,
 78,
 83,
 95,
 66,
 72,
 85,
 70,
 56,
 76,
 52,
 73,
 80,
 55,
 74,
 57,
 60,
 63,
 82,
 90,
 84,
 93,
 62,
 91,
 99,
 65,
 87,
 61]

We can also use the lambda function directly.


In [21]:
rdd.filter(lambda i: i > 50).collect()

[96,
 86,
 77,
 71,
 94,
 81,
 97,
 68,
 59,
 89,
 79,
 88,
 58,
 92,
 98,
 54,
 75,
 51,
 64,
 67,
 53,
 69,
 78,
 83,
 95,
 66,
 72,
 85,
 70,
 56,
 76,
 52,
 73,
 80,
 55,
 74,
 57,
 60,
 63,
 82,
 90,
 84,
 93,
 62,
 91,
 99,
 65,
 87,
 61]

Let’s consider the same list of integers. Now we want to compute the sum for elements in that list which are greater than 50 but less than 75. 

In [22]:
rdd.filter(lambda x: x >50).filter(lambda x: x< 75).sum()

1500

You should see "1500" as answer. Now we want to know the sum of all elements. 

In this exercise you’ll create a DataFrame, register a temporary query table and issue SQL commands against it. 

Let’s create a little data frame:


In [23]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=1, value='value1'),Row(id=2, value='value2')])

# let's have a look what's inside
df.show()

# let's print the schema
df.printSchema()

+---+------+
| id| value|
+---+------+
|  1|value1|
|  2|value2|
+---+------+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)



Now we register this DataFrame as query table and issue an SQL statement against it. Please note that the result of the SQL execution returns a new DataFrame we can work with.


In [24]:
# register dataframe as query table
df.createOrReplaceTempView('df_view')

# execute SQL query
df_result = spark.sql('select value from df_view where id=2')

# examine contents of result
df_result.show()

# get result as string
df_result.first().value

+------+
| value|
+------+
|value2|
+------+



'value2'