## Week 1 : Scalable Machine Learning on Big Data using Apache Spark

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 62kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=31727e220c7bb6d91e2d17e56446c6697c5d017a04c4712a573989af2678fee0
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


## Parallel data processing strategies of Apache Spark

In [9]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [3]:

sc =SparkContext()
rdd = sc.parallelize(range(100))

In [10]:
spark = SparkSession \
    .builder \
    .getOrCreate()

In [4]:
rdd.count()

100

In [5]:
rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [11]:
rdd.sum()

4950

In [8]:
# rdd.collect()
# Note : avoid doing this while for large data else it will give memory error.



Sumary 
* ApacheSpark programs are implicitly paraller
* Same code can process 1KB or 1PB
* RDD Central API
* Data and task distribution transparent 





# Functional Programming (FP)

Lambda Calculus : https://en.wikipedia.org/wiki/Lambda_calculus

- var output = fucntion (fn,x,y,z) { return fn(x,y,z);};
- var sum = function(a,b,c) {return (a+b+c);};
- alert(output(sum,1,2,3));

function creater  > f(x) = x+1


In [21]:
rdd = sc.parallelize(range(100))
rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [20]:
rdd.map(lambda x: x+1).take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [24]:
# Taking suma of all
sc.parallelize(range(1,101)).reduce(lambda a,b:a+b)


5050

In [None]:
ApacheSpark parallelises computions using the lambda calculus
All spark 

In [25]:
## Let’s actually create a python function which decides whether a value is greater than 50 (True) or not (False).

def gt50(i):
  if i>50:
    return True
  else:
    return False

In [27]:
print(gt50(100))
print(gt50(34))

True
False


In [28]:
## 
def gt50(i):
  return i>50

In [29]:
print(gt50(100))
print(gt50(34))

True
False


In [30]:
gt50 = lambda i: i > 50

In [31]:
print(gt50(100))
print(gt50(34))

True
False


In [32]:
from random import shuffle
l = list(range(100))
shuffle(l)
rdd = sc.parallelize(l)

In [34]:
rdd.take(10)
# Let’s filter values from our list which are equals or less than 50 by applying our “gt50” function to the list using the “filter” function. 
# Note that by calling the “collect” function, all elements are returned to the Apache Spark Driver. 
# This is not a good idea for BigData, please use “.sample(10,0.1).collect()” or “take(n)” instead.


[82, 54, 12, 53, 44, 14, 96, 89, 84, 93]

In [35]:
rdd.filter(gt50).collect()

[82,
 54,
 53,
 96,
 89,
 84,
 93,
 81,
 60,
 68,
 95,
 62,
 78,
 67,
 90,
 70,
 87,
 57,
 97,
 99,
 66,
 63,
 88,
 80,
 71,
 64,
 75,
 65,
 52,
 98,
 86,
 94,
 61,
 55,
 58,
 85,
 59,
 74,
 79,
 77,
 72,
 91,
 56,
 76,
 73,
 51,
 92,
 83,
 69]

In [36]:
rdd.filter(lambda i: i > 50).collect()

[82,
 54,
 53,
 96,
 89,
 84,
 93,
 81,
 60,
 68,
 95,
 62,
 78,
 67,
 90,
 70,
 87,
 57,
 97,
 99,
 66,
 63,
 88,
 80,
 71,
 64,
 75,
 65,
 52,
 98,
 86,
 94,
 61,
 55,
 58,
 85,
 59,
 74,
 79,
 77,
 72,
 91,
 56,
 76,
 73,
 51,
 92,
 83,
 69]

In [37]:
# Now we want to compute the sum for elements in that list which are greater than 50 but less than 75. Please implement the missing parts.

rdd.filter(lambda x: x > 50).filter(lambda x: x <75).sum()

1500

### Resilient Distributed Dataset and DataFrames - ApacheSparkSQL

- RDD are schemaless

In [38]:
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=1, value='value1'),Row(id=2, value='value2')])

# let's have a look what's inside
df.show()

# let's print the schema
df.printSchema()

+---+------+
| id| value|
+---+------+
|  1|value1|
|  2|value2|
+---+------+

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)



In [39]:
# register dataframe as query table
df.createOrReplaceTempView('df_view')

# execute SQL query
df_result = spark.sql('select value from df_view where id=2')

# examine contents of result
df_result.show()

# get result as string
df_result.first().value

+------+
| value|
+------+
|value2|
+------+



'value2'

In [40]:
df_result.count()

1

In [41]:
df.columns

['id', 'value']

References 
- https://www.coursera.org/learn/machine-learning-big-data-apache-spark/home/welcome
- https://github.com/IBM/skillsnetwork/tree/master/coursera_bd