In [10]:
# spark
# !pip install pyspark
# !pip install findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('Basics').getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

#Creating spark context-Its like connecting to spark cluster
from pyspark import SparkConf 
from pyspark.context import SparkContext
from pyspark.sql.functions import *
import pandas as pd
import numpy as np 

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [11]:
# Task1
from pyspark.sql.types import IntegerType,BooleanType,DateType
spark=SparkSession.builder.appName('idt').getOrCreate()
df = spark.read.csv('diabetes.csv',header=True)
df = df.withColumn("value",df.value.cast(IntegerType()))
# create a df for timestamp


split_col = split(df['time'], ':')
df_t = df.withColumn('hour', split_col.getItem(0)).withColumn('minute', split_col.getItem(1))
df_t = df_t.withColumn("hour",df_t.hour.cast(IntegerType()))
df_t.show(truncate=False)
# filter people who have strong exercise in afternoon, calculate their average bg after dinner
df_t_se = df_t.filter(((df_t['code']=='70')|(df_t['code']=='69')|(df_t['code']=='68'))&((df_t['hour']>=12)&(df_t['hour']<=18)))


# merge index and date
from pyspark.sql.functions import concat_ws,col
df_t_se_2=df_t_se.select(concat_ws('',df_t_se.index,df_t_se.date).alias("indexDate"),"time","code","hour", 'minute','value')


df_t_2 = df_t.select(concat_ws('',df_t.index,df_t.date).alias("indexDate"),"time","code","hour", 'minute','value')


# conditions
indexes=df_t_se_2.rdd.map(lambda x: x[0]).collect()


# bg after dinner & have strong exercise in afternoon
df_se_bg = df_t_2.filter((df_t_2.indexDate.isin(indexes))&(df_t_2['code']=='63'))


# average post dinner bg values for df_se_bg
from pyspark.sql.functions import avg
avg_ex = df_se_bg.select(avg('value'))
avg_ex = avg_ex.withColumnRenamed('avg(value)', 'avg_with_exercise')
avg_ex.show()


# average post dinner bg values for everyone
df_afdbg = df.filter((df['code']=='63'))
avg_no_ex = df_afdbg.select(avg('value'))
avg_no_ex = avg_no_ex.withColumnRenamed('avg(value)', 'avg_no_exercise')
avg_no_ex.show()


+-----+----------+-----+----+-----+----+---------------------------+------+--------------+----------------+-----------+---------------+---------------+-------------+---------------------------+------------+-------+----------+----+------+
|index|date      |time |code|value|Name|D_O_B                      |Gender|Phone_Number  |Type_of_diabetes|hba1c      |Time_of_reading|glucose_reading|carb_counting|days_of_excercise_in_a_week|carbs_intake|alcohol|cigarettes|hour|minute|
+-----+----------+-----+----+-----+----+---------------------------+------+--------------+----------------+-----------+---------------+---------------+-------------+---------------------------+------------+-------+----------+----+------+
|1    |04-21-1991|9:09 |58  |100  |1   |Tuesday, September 30, 1197|Female|6-887-057-5123|2               |9.440355992|10:50 AM       |484            |1            |4                          |364.9345547 |1.0    |1         |9   |09    |
|1    |04-21-1991|9:09 |33  |9    |2   |Tuesday,

In [17]:
# Task 2
import numpy as np
rdd = spark.read.csv("diabetes.csv", header=True, inferSchema = True)

## Transform value from string to integer
from pyspark.sql.functions import col
rdd_1 = rdd.withColumn(colName = "value", col = col("value").cast("Integer"))

#Pre-meal blood glucose measurement
rdd_2 = rdd_1.filter("code == 58 or code == 60 or code == 62")
# rdd_2.show()

rdd_3 = rdd_2.groupBy("index").avg("value")
# rdd_3.show()

#Post-meal blood glucose measurement
rdd_4 = rdd_1.filter("code == 59 or code == 61 or code == 63")
# rdd_4.show()

rdd_5 = rdd_4.groupBy("index").avg("value")
# rdd_5.show()

##Comparison between pre and post
rdd_6 = rdd_3.union(rdd_5)
rdd_7 = rdd_6.rdd.groupByKey().mapValues(list)
# rdd_7

outcome = rdd_7.collect()
num = 0
diff = []
direction = []
for (ind, result) in outcome:
  if len(result)>1:
    num+=1
    direction.append(result[1]>result[0])
    diff.append((result[1]-result[0])/result[1])
print(diff)

[0.38280060882800615, 0.10064072511329888, 0.23204799248310504, 0.12285836229498198, 0.18006612104171946, 0.31424429140993115, 0.006577710956587196, -1.1322970811675332, 0.5865952032916464, 0.09150103188354615, 0.2814737814737815, 0.3917842791335882, 0.19274761186525896, 0.3576113111995344, 0.024603174603174557, -0.2291605215031561, 0.003165140870058793, 0.35328575676479335, 0.44261763115197406, 0.2938205386076657, 0.12901155327342742, -0.2917306052855924, 0.060987170164138745, 0.04216155361666202, 0.23355668721522377, 0.09395018984060081, 0.3188992310805341, 0.43473673103302735, 0.21994404309516857, 0.08287842658100635, 0.40379629629629626, 0.3149823248076523, -2.7258880228476228]


In [18]:
# Task 3

# Read data
rdd = spark.read.csv("diabetes.csv", header=True, inferSchema = True)

#
rdd_1 = rdd.withColumn(colName = "value", col = col("value").cast("Integer"))
rdd_2 = rdd_1.select(["index", "code", "value", "Gender"])
rdd_3 = rdd_2.filter("code == 48 or code == 57 or code == 58 or code ==59 or code ==60 or code ==61 or code ==62 or code ==63 or code ==64")
rdd_4 = rdd_3.groupBy("Gender").avg("value")
rdd_4.show()


+------+------------------+
|Gender|        avg(value)|
+------+------------------+
|Female|160.15859798605732|
|  Male| 160.3111616222036|
+------+------------------+

