Welcome to exercise two of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise you’ll apply the basics of functional and parallel programming. 

Again, please use the following two links for your reference:
https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
https://spark.apache.org/docs/latest/rdd-programming-guide.html

Let’s actually create a python function which decides whether a value is greater than 50 (True) or not (False).

This notebook is designed to run in a IBM Watson Studio Apache Spark runtime. In case you are running it in an IBM Watson Studio standard runtime or outside Watson Studio, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In [20]:
!pip install --upgrade pip

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Collecting pip
  Using cached pip-20.0.2-py2.py3-none-any.whl (1.4 MB)
Installing collected packages: pip
Successfully installed pip-20.0.2


In [23]:
if not ('sc' in locals() or 'sc' in globals()):
    print('It seems you are note running in a IBM Watson Studio Apache Spark Notebook. You might be running in a IBM Watson Studio Default Runtime or outside IBM Waston Studio. Therefore installing local Apache Spark environment for you. Please do not use in Production')
    
    from pip import main
    main(['install', 'pyspark==2.4.5'])
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession

    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

array([[1., 1.],
       [1., 1.]])

In [1]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200424220730-0000
KERNEL_ID = 4aad9d72-d1c6-4cc7-a3e8-a5028f4d9797
--2020-04-24 22:07:35--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-04-24 22:07:35--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-04-24 22:07:36--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.gi

In [15]:
from pyspark.sql.functions import rand
df = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))

In [16]:
df.show()
df.printSchema()

+---+-------------------+-------------------+
| id|              rand1|              rand2|
+---+-------------------+-------------------+
|  0|0.41371264720975787|  0.714105256846827|
|  1| 0.7311719281896606| 0.8143487574232506|
|  2| 0.9031701155118229| 0.5282207324381174|
|  3|0.09430205113458567| 0.4420100497826609|
|  4|0.38340505276222947| 0.9387162206758006|
|  5| 0.1982919638208397|0.19369846818250636|
|  6|0.12714181165849525| 0.0838940132767162|
|  7| 0.7604318153406678| 0.9895050223719429|
|  8|   0.83487085888236| 0.8394775938142013|
|  9| 0.3142596916968412| 0.5809927918049477|
+---+-------------------+-------------------+

root
 |-- id: long (nullable = false)
 |-- rand1: double (nullable = false)
 |-- rand2: double (nullable = false)



In [7]:
#Un count agrupado por clases
spark.sql('select class,count(*) as count from df group by class ORDER BY count ').show()

+--------------+-----+
|         class|count|
+--------------+-----+
|      Eat_soup| 6683|
|   Liedown_bed|11446|
| Use_telephone|15225|
|Descend_stairs|15375|
|     Comb_hair|23504|
| Sitdown_chair|25036|
| Standup_chair|25417|
|   Brush_teeth|29829|
|      Eat_meat|31236|
|  Climb_stairs|40258|
|    Pour_water|41673|
|   Drink_glass|42792|
|     Getup_bed|45801|
|          Walk|92254|
+--------------+-----+



In [8]:
df.groupBy('class').count().orderBy('count').show()

+--------------+-----+
|         class|count|
+--------------+-----+
|      Eat_soup| 6683|
|   Liedown_bed|11446|
| Use_telephone|15225|
|Descend_stairs|15375|
|     Comb_hair|23504|
| Sitdown_chair|25036|
| Standup_chair|25417|
|   Brush_teeth|29829|
|      Eat_meat|31236|
|  Climb_stairs|40258|
|    Pour_water|41673|
|   Drink_glass|42792|
|     Getup_bed|45801|
|          Walk|92254|
+--------------+-----+



In [9]:
import pixiedust
from pyspark.sql.functions import col
counts = df.groupBy('class').count()
display(counts)

class,count
Use_telephone,15225
Standup_chair,25417
Eat_meat,31236
Getup_bed,45801
Drink_glass,42792
Pour_water,41673
Comb_hair,23504
Walk,92254
Climb_stairs,40258
Sitdown_chair,25036


In [6]:
from pyspark.sql.functions import col, min, max, mean, stddev

df \
    .groupBy('class') \
    .count() \
    .select([ 
        min(col("count")).alias('min'), 
        max(col("count")).alias('max'), 
        mean(col("count")).alias('mean'), 
        stddev(col("count")).alias('stddev') 
    ]) \
    .select([
        col('*'),
        (col("max") / col("min")).alias('minmaxratio')
    ]) \
    .show()

+----+-----+------------------+------------------+-----------------+
| min|  max|              mean|            stddev|      minmaxratio|
+----+-----+------------------+------------------+-----------------+
|6683|92254|31894.928571428572|21284.893716741157|13.80427951518779|
+----+-----+------------------+------------------+-----------------+



In [11]:
from pyspark.sql.functions import min

# create a lot of distinct classes from the dataset
classes = [row[0] for row in df.select('class').distinct().collect()]

# compute the number of elements of the smallest class in order to limit the number of samples per calss
min = df.groupBy('class').count().select(min('count')).first()[0]

# define the result dataframe variable
df_balanced = None

# iterate over distinct classes
for cls in classes:
    
    # only select examples for the specific class within this iteration
    # shuffle the order of the elements (by setting fraction to 1.0 sample works like shuffle)
    # return only the first n samples
    df_temp = df \
        .filter("class = '"+cls+"'") \
        .sample(False, 1.0) \
        .limit(min)
    
    # on first iteration, assing df_temp to empty df_balanced
    if df_balanced == None:    
        df_balanced = df_temp
    # afterwards, append vertically
    else:
        df_balanced=df_balanced.union(df_temp)
df_balanced

DataFrame[x: int, y: int, z: int, source: string, class: string]

In [18]:
from pyspark.sql import Row
df = spark.parrallel([1,2])

# let's have a look what's inside
df.show()

# let's print the schema
df.printSchema()

AttributeError: 'SparkSession' object has no attribute 'parrallel'

In [26]:
# register dataframe as query table
df.createOrReplaceTempView('df_view')

# execute SQL query
df_result = spark.sql('select value from df_view where id=1')

# examine contents of result
df_result.show()

# get result as string
df_result.first().value

+------+
| value|
+------+
|value1|
+------+



'value1'

In [29]:
df_result = spark.sql('select COUNT(*)from df_view')
df_result.first()

Row(count(1)=2)

In [8]:
def gt50(i):
    if i > 50:
        return True
    else:
        return False

In [5]:
print(gt50(4))
print(gt50(51))

False
True


Let’s simplify this function

In [6]:
def gt50(i):
    return i > 50

In [7]:
print(gt50(4))
print(gt50(51))

False
True


Now let’s use the lambda notation to define the function.

In [9]:
gt50 = lambda i: i > 50

In [10]:
print(gt50(4))
print(gt50(51))

False
True


In [11]:
#let's shuffle our list to make it a bit more interesting
from random import shuffle
l = list(range(100))
shuffle(l)
rdd = sc.parallelize(l)

Let’s filter values from our list which are equals or less than 50 by applying our “gt50” function to the list using the “filter” function. Note that by calling the “collect” function, all elements are returned to the Apache Spark Driver. This is not a good idea for BigData, please use “.sample(10,0.1).collect()” or “take(n)” instead.

In [12]:
rdd.filter(gt50).collect()

[84,
 64,
 80,
 60,
 52,
 83,
 73,
 95,
 66,
 69,
 68,
 92,
 78,
 59,
 86,
 72,
 65,
 57,
 96,
 77,
 63,
 87,
 53,
 54,
 61,
 99,
 62,
 93,
 55,
 89,
 75,
 97,
 56,
 94,
 71,
 98,
 51,
 85,
 58,
 67,
 90,
 74,
 82,
 79,
 91,
 76,
 88,
 81,
 70]

We can also use the lambda function directly.

In [13]:
rdd.filter(lambda i: i > 50).collect()

[84,
 64,
 80,
 60,
 52,
 83,
 73,
 95,
 66,
 69,
 68,
 92,
 78,
 59,
 86,
 72,
 65,
 57,
 96,
 77,
 63,
 87,
 53,
 54,
 61,
 99,
 62,
 93,
 55,
 89,
 75,
 97,
 56,
 94,
 71,
 98,
 51,
 85,
 58,
 67,
 90,
 74,
 82,
 79,
 91,
 76,
 88,
 81,
 70]

Let’s consider the same list of integers. Now we want to compute the sum for elements in that list which are greater than 50 but less than 75. Please implement the missing parts. 

In [15]:
rdd.filter(lambda x: x>50).filter(lambda x: x<75).reduce(lambda a,x: a+x)

1500

You should see "1500" as answer. Now we want to know the sum of all elements. Please again, have a look at the API documentation and complete the code below in order to get the sum.

In [16]:

rdd = sc.parallelize(range(100))

In [18]:
rdd.reduce(lambda a,x: a+x)

4950

In [19]:
rdd.count()

100

In [58]:
from pyspark.mllib.stat import Statistics


rddX = sc.parallelize([34,1,23,4,3,3,12,4,3,1])
Meanx = rddX.sum()/rddX.count()
STX= rddX.map(lambda x: pow(x-Meanx,2)).sum()/rddX.count()
print('e1',Meanx,STX)

rddX = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rddY = sc.parallelize([7,6,5,4,5,6,7,8,9,10])
XY= rddX.zip(rddY)


Statistics.corr(XY)
Meanx = rddX.sum()/rddX.count()
Meany = rddY.sum()/rddY.count()
print(Meanx,Meany)
cov = XY.map(lambda x_y : (x_y[0]-Meanx)*(x_y[1]-Meany)).sum()/XY.count()
print(cov)
STX= rddX.map(lambda x: pow(x-Meanx,2)).sum()/rddX.count()
STY= rddY.map(lambda x: pow(x-Meany,2)).sum()/rddY.count()

COR = cov/(STX*STY)
print(COR)

e1 8.8 111.55999999999999
5.5 6.7
3.65
0.13782686679882944
