# Data Analytics in Healthcare

## Week 3 - Map Reduce Framework

Objective: To learn about the MapReduce paradigm and work with a simple example

##Lets get started

Go to SparkHome/bin and run ./pyspark

You should see:

```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.6 (v2.7.6:3a1db0d2747e, Nov 10 2013 00:42:54)

```


In [2]:
# print Spark version
print("pyspark version: " + str(sc.version))

pyspark version: 2.1.0


## Once you run PySpark, it defines a Spark context object (sc) and a SQL conext (sqlCtx)
#
#

In [3]:
sc

<pyspark.context.SparkContext object at 0x7f387d3f8490>

In [4]:
rdd=sc.parallelize(range(1,1000))
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475

In [None]:
# take
x = sc.parallelize([1,2,3,4,5])
y = x.take(num = 3)
print(y)

###rdd.collect()` converts a RDD object into a Python list on the host machine. i.e get all the values in an RDD
#

In [None]:
# collect

x = sc.parallelize([1,2,3,4,5])
y = x.collect()
print(y)  # not distributed

In [None]:
# first
y = x.first()
print(y)

In [None]:
# filter
y = x.filter(lambda x: x%2 == 1)  # filters out even elements
print(y.collect())

In [None]:
# map
y = x.map(lambda x: (x,x**2))
y.collect()


In [None]:
# flatMap
x = sc.parallelize([1,2,3,4,5])
y1 = x.map(lambda x: (x, 100*x, x**2))
y2 = x.flatMap(lambda x: (x, 100*x, x**2))
print(x.collect())
print(y1.collect())
print(y2.collect())

In [None]:
# reduce
y = x.reduce(lambda obj, accumulated: obj + accumulated)  # computes a cumulative sum
print(y)

In [None]:
# reduceByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.reduceByKey(lambda v1, v2: v1 + v2)
print(y.collect())

##MapReduce

In [None]:
x.map(lambda gender:(data[1],1).reduceByKey(lambda x,y:(x+y)).collect()

In [None]:
# union
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
print(z.collect())

##Reading from Files

In [None]:
inputTxt=sc.textFile("austen-sense.txt")


##Each line is a separate element in the RDD

In [None]:
inputTxt.take(10)

##Saving to Files


`.saveAsTextFile()` saves an RDD as a string. there is also `.saveAsPickleFile()`.

`.rsaveAsNewAPIHadoopDataset()` saves an RDD object to HDFS.

##Reading inputs from S3 and counting the words
#


In [None]:
#text = sc.textFile("s3://YOUR_S3_BUCKET/austen-sense.txt")

text=sc.textFile("austen-sense.txt")





In [None]:
lines	=	sc.textFile("mytext",	5)	
comments	=	lines.filter(myFunc)	
print	lines.count()
print   comments.count()	

## How to solve this problem?

In [None]:
lines	=	sc.textFile("mytext",	5)	
lines.cache()	#	save,	don't	recompute!	
comments	=	lines.filter(myFunc)	
print	lines.count(),comments.count()	

Spark Program Lifecycle
1. Create RDDs from external data or parallelize a
collection in your driver program
2. Lazily transform them into new RDDs
3. cache() some RDDs for reuse
4. Perform actions to execute parallel
computation and produce results

In [None]:
words = text.flatMap(lambda line: line.split()) \
            .map(lambda word: word.lower())

In [None]:
words.take(10)

In [None]:
mapw=words.map(lambda word: (word, 1))
               

In [None]:
mapw.take(3)

In [None]:
count=mapw.reduceByKey(lambda x,y: x + y)

In [None]:
count.take(3)

In [None]:
def wordcounts(words_rdd):
    return words_rdd.map(lambda word: (word, 1)) \
                .reduceByKey(lambda x,y: x + y) \
                .sortByKey(False)
                #.map(lambda (word, count): (count, word)) \

In [None]:
wordcounts(words).take(10)

In [None]:
sc

##How do I write a program that uses Spark

In [5]:
from pyspark import SparkContext, SparkConf

#Optional Config
ProgramName="Myp"
master="local"
config = SparkConf().setAppName(ProgramName).setMaster(master)


sc = SparkContext(conf=config)

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/tmp/kernel-PySpark-882eb562-9795-4fff-a47c-b15f85f7f196/pyspark_runner.py", line 194, in <module>
    eval(compiled_code)
  File "<string>", line 5, in <module>
  File "/usr/local/spark/python/pyspark/context.py", line 115, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/usr/local/spark/python/pyspark/context.py", line 272, in _ensure_initialized
    callsite.function, callsite.file, callsite.linenum))
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Apache Toree, master=local[*]) created by __init__ at /tmp/kernel-PySpark-882eb562-9795-4fff-a47c-b15f85f7f196/pyspark_runner.py:143 

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Optio

In [None]:
!cat student.py

In [None]:
./pyspark --py-files student.py

In [None]:
sc

In [None]:
import student

In [None]:
!cat records.txt

In [None]:
students=sc.textFile("records.txt")

In [None]:
students.take(10)

In [None]:
students=sc.textFile("records.txt").map(lambda rec: student.Student().parse(rec))

In [None]:
students.first()

In [None]:
P_Group=students.map(lambda p: (p.program,1)).reduceByKey(lambda x,y: x + y)

In [None]:
P_Group.collect()

#Joins#: Gets 2 RDDs (k,v1),(k,v2) and create a joined RDD (k,(v1,v2))

In [None]:
students_courses=[("Alex","w205"),("Mike","info290"),("Ross","w205")]

In [None]:
program_rec=[("Alex","MIDS"),("Mike","MIMS"),("Ross","MIDS")]

In [None]:
SC_rdd=sc.parallelize(students_courses)

In [None]:
P_rdd=sc.parallelize(program_rec)

In [None]:
Comp_rec=SC_rdd.join(P_rdd)


In [None]:
Comp_rec.collect()

## DataTables: RDDs with Schema

-Similar to Tables in SQL

-Are python objects without methods that you can have access to field

-All rows in DataTables should have the same types such as JSON objects with the same fields (opposite to RDDs)
Could have rows with null or arrays


-The schema can be from semi-structured files such as JSONs but you can enforce a schema if you do not have it in the input.

-No code translation. Most of its codes are executed in Scala and you do not pay the perfromace overhead that pyspark has excpet the small initial requests/getting the results- -There is query optimizer that you can use which does not use the python interface-no overhead except for showing the results



In [None]:
!cat records.json

In [None]:
students=sqlCtx.jsonFile("records.json")



#Generates some folders and files to keep track of schemas,...

##students is an RDD and you can do all the stuff that you did with RDDs but..

In [None]:
students.show()  #the first 20 rows in the context

In [None]:
students.select('program').show()

In [None]:
students.select(students.program).show()

In [None]:
students.filter(students.age > 27).show()

##Since it uses a variation of DataFrame

In [None]:
students[students.program == 'MIDS'].show()

In [None]:
students.groupBy(students.program).count().show()

In [None]:
students.groupBy(students.program).avg('age').show()

#Like SQL more? You can run SQL in Spark

In [None]:
students.registerTempTable("st")

In [None]:
sqlCtx.sql("select name, program FROM st").show()

In [None]:
sqlCtx.sql("select program,avg(age) AS AverageAge FROM st GROUP BY program").show()


In [None]:
from pyspark.sql import functions as funcs

AvgMin=students.groupBy('program').agg(funcs.avg('age').alias('AverageAge '),funcs.max('age').alias('MaximumAge'))

AvgMin.show()

#How the queries are optimized

In [None]:
sqlCtx.sql("select name, program FROM st").explain()