# Big Data - Spark Assignment

### Atlanta Liu
### Winter 2020

### 1. Written Questions

a) What are the two key processes in Hadoop?

- MapReduce and Hadoop Distributed File System (HDFS)

b) Is HDFS optimized for sequential or random reading of data?

- Sequential reading of data

c) By default, how many back-up copies does HDFS create?

- Data is replicated 3 times by default across different data nodes

d) In HDFS, what entity maintains and manages the file system meta data?

- Name node manages and maintains the meta data for file systems

e) What is the purpose of Spark Driver?

- The main purpose of the Spark Driver is the initiate the Spark Context, as well as scheduling tasks (transformations/actions) to the executor. It keeps tracks of workers through a continous pings to know if the executors are on track or if they have failed. Cached data and logs can be used to help rebuild loss data in case an error has occurred, ensuring fault-tolerance. Once the driver exits, resources taken up by executors are released for use elsewhere.

f) What is the purpose of Spark Cluster Manager?

- The Spark Cluster Manager creates working nodes for the driver and connects with the Spark Context. There are several types of cluster managers (standalone, yarn, etc), but the main purpose is to help ensure that resources are properly allocated across each application. 

### Spark Instantiation

In [1]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="3:00" #1 hour
os.environ['SBATCH_PARTITION']='single' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 5018898

INFO:sparkhpc.sparkjob:Submitted cluster 0


### All files referenced are from MIMIC

2. Using INPUTEVENTS_MV.csv, perform the following tasks using RDD operations:

a) Sum all of the values in the amount column

b) Determine the total amount (with units) of each ITEMID

c) Determine the number of different categories for ORDERCATEGORYDESCRIPTION

d) Count how many unique SUBJECT_ID's there are with more than 100 rows

In [38]:
# Importing file
data = sc.textFile("./INPUTEVENTS_MV.csv")

In [40]:
# a)
lista = data.map(lambda x: x.split(",")[7]).collect()[1:]
print(sum(map(float, lista)))

792410333.703147


In [151]:
# b) ItemID = 6, amount = 7, amountunit = 8
# Determine the total amount (with units) of each ITEMID

listb=data.map(lambda x: (x.split(",")[6].replace('"','') + "-" + x.split(",")[8].replace('"',''), x.split(",")[7]))
reduced=listb.reduceByKey(lambda x,y: float(x)+float(y))
print(reduced.glom().collect())

[[('221385-mg', 28559.526306659995), ('226372-ml', 1313073.0), ('225833-mg', 292352.05831663054), ('226375-ml', 895491.0), ('225832-mg', 5379.869288173501), ('227535-mg', 4418.3418884044995), ('225848-mg', 515.000004), ('227528-ml', 232889.67719339993), ('225843-grams', 189.0), ('225890-mg', '250'), ('225148-mg', 14289.486826529303), ('221906-mcg', 1599.9999411000001), ('228140-ml', 14638.92231098), ('228351-ml', 378148.03714277), ('225876-mg', '250'), ('225844-mg', 1500.00012), ('226364-L', '6')], [('225166-mEq', 1474662.655897773), ('221623-mg', 38190.403038700046), ('225910-dose', 20719.0), ('225151-mg', 79181.06043505501), ('225853-dose', 1793.0), ('222062-mg', 9698.1242446088), ('227529-ml', 15165390.941652365), ('223259-units', 60809.5), ('225855-grams', 53.0), ('225862-mg', 270.000012), ('225862-dose', 265.0), ('226036-ml', 10358.90715326), ('228316-mg', 724775.0197100001), ('225888-mg', -4.999999900000001), ('225996-ml', 240.0), ('226049-ml', 10269.904191329999), ('227978-ml', 

In [123]:
# c)
data.map(lambda x: x.split(",")[16]).distinct().count()-1

5

In [148]:
# d) Count how many unique SUBJECT_ID's there are with more than 100 rows
reduce = data.map(lambda x:(x.split(",")[1].replace('"',''),1)).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] > 100)
print(reduce.count())

7175


<br><br>3. Using ADMISSIONS.csv and PROCEDUREEVENTS_MV.csv, perform the following tasks using Spark SQL:

a) Select all rows where the person has government insurance

b) Find the admission location that led to the most discharges to rehab/distinct part hosp

c) Find the most common ORDERCATEGORYNAME for patients with an admission type of emergency

In [129]:
# Importing file
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('app').getOrCreate()
admission = spark.read.csv('./ADMISSIONS.csv', inferSchema = True, header = True)
procedure = spark.read.csv('./PROCEDUREEVENTS_MV.csv', inferSchema = True, header = True)

In [None]:
#admission #ADMISSION_TYPE == EMERGENCY

In [130]:
admission.createOrReplaceTempView('admission')
procedure.createOrReplaceTempView('procedure')

In [131]:
# a)
sqlCtx.sql("""
SELECT * 
FROM 
    admission 
WHERE 
    INSURANCE == 'Government'
""").show(5)

+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+----------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION| INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+----------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|   468|       363| 196503|2176-03-01 15:26:00|2176-03-03 14:04:

In [6]:
# b)
sqlCtx.sql("""
SELECT
  ADMISSION_LOCATION,
  COUNT(*)
FROM
  admission
WHERE
    DISCHARGE_LOCATION == 'REHAB/DISTINCT PART HOSP'
GROUP BY
  ADMISSION_LOCATION
ORDER BY COUNT(*) DESC
LIMIT 1
""").show()

+--------------------+--------+
|  ADMISSION_LOCATION|count(1)|
+--------------------+--------+
|EMERGENCY ROOM ADMIT|    3399|
+--------------------+--------+



In [10]:
# c) Find the most common ORDERCATEGORYNAME for patients with an admission type of emergency
sqlCtx.sql("""
SELECT
    ORDERCATEGORYNAME,
    COUNT(*)
FROM

(SELECT
    admission.ADMISSION_TYPE,
    procedure.ORDERCATEGORYNAME
FROM
    admission
INNER JOIN
    procedure 
ON 
    admission.ROW_ID = procedure.ROW_ID
WHERE
    admission.ADMISSION_TYPE = 'EMERGENCY')

GROUP BY
    ORDERCATEGORYNAME
ORDER BY COUNT(*) DESC
LIMIT 1
""").show()

+-----------------+--------+
|ORDERCATEGORYNAME|count(1)|
+-----------------+--------+
|       Procedures|   11583|
+-----------------+--------+

