In [1]:
import imp
from pyspark.sql import SparkSession
from pyspark import SparkContext
from loader import readDataAsTable, readDataIntoRDD
from first_culture import firstCulture
import first_antibiotic
import pyspark.sql.functions as func

PATH_TO_DATA = '/home/jovyan/work/data/'

ss = SparkSession.builder \
    .master('local[*]') \
    .appName('Big Data Project') \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.memory', '2G') \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.kryoserializer.buffer', '24') \
    .getOrCreate()

In [2]:
t1 = readDataAsTable(ss, '{0}t1.csv'.format(PATH_TO_DATA), 't1', 'false') \
    .toDF('SUBJECT_ID', 'DATE')


In [4]:
patients = readDataAsTable(ss, '{0}PATIENTS.csv'.format(PATH_TO_DATA), 'patients', 'true')

In [8]:
admissions = readDataAsTable(ss, '{0}ADMISSIONS.csv'.format(PATH_TO_DATA), 'patients', 'true')

In [6]:
patients.count()

46520

In [7]:
t1.count()

39364

In [9]:
admissions.count()

58976

In [11]:
firstAdmit = admissions.groupby(admissions['SUBJECT_ID']) \
    .agg(func.min(admissions["ADMITTIME"]).alias('FIRST_ADMIT'))

In [12]:
firstAdmit.take(4)

[Row(SUBJECT_ID=148, FIRST_ADMIT=datetime.datetime(2107, 9, 5, 14, 58)),
 Row(SUBJECT_ID=463, FIRST_ADMIT=datetime.datetime(2198, 10, 5, 16, 43)),
 Row(SUBJECT_ID=471, FIRST_ADMIT=datetime.datetime(2122, 7, 22, 14, 4)),
 Row(SUBJECT_ID=833, FIRST_ADMIT=datetime.datetime(2137, 5, 23, 4, 46))]

In [19]:
patientsAndAdmissions = patients.join(firstAdmit, patients.SUBJECT_ID == firstAdmit.SUBJECT_ID)

In [33]:
def extractAgeAndGender(r):
    age = (r['FIRST_ADMIT'] - r['DOB']).days / 365.2425
    
    return (r['SUBJECT_ID'], age, r['GENDER'])

In [39]:
ageAndGender = patientsAndAdmissions.rdd.map(extractAgeAndGender).toDF(['SUBJECT_ID', 'AGE', 'GENDER'])

In [40]:
ageAndGender.take(2)

[Row(SUBJECT_ID=249, AGE=74.76402663983518, GENDER='F'),
 Row(SUBJECT_ID=250, AGE=23.877287007946773, GENDER='F')]

In [41]:
infected = ageAndGender.join(t1, ageAndGender.SUBJECT_ID == t1.SUBJECT_ID)

In [42]:
notInfected = ageAndGender.join(t1, ageAndGender.SUBJECT_ID == t1.SUBJECT_ID, 'left') \
    .where(t1['SUBJECT_ID'].isNull())

In [48]:
infected.filter(infected['AGE'] <= 18 & infected['AGE'] <= 18).count()

6772

In [44]:
notInfected.count()

7156

In [65]:
def getStats(df):
    females = df.filter(df['GENDER'] == 'F').count()
    males = df.filter(df['GENDER'] == 'M').count()
    seventeenAndUnder = df.filter(df['AGE'] < 18).count()
    eighteen = df.filter((df['AGE'] >= 18) & (df['AGE'] < 30)).count()
    thirty = df.filter( (df['AGE'] >= 30) & (df['AGE'] < 40) ).count()
    forty = df.filter( (df['AGE'] >= 40) & (df['AGE'] < 50) ).count()
    fifty = df.filter( (df['AGE'] >= 50) & (df['AGE'] < 60) ).count()
    sixty = df.filter( (df['AGE'] >= 60) & (df['AGE'] < 70) ).count()
    seventy = df.filter( df['AGE'] >= 70).count()
    
    print(f'Women: {females}')
    print(f'males: {males}')    
    print(f'seventeenAndUnder: {seventeenAndUnder}')    
    print(f'eighteen: {eighteen}')    
    print(f'thirty: {thirty}')    
    print(f'forty: {forty}')    
    print(f'fifty: {fifty}')    
    print(f'sixty: {sixty}')        
    print(f'seventy: {seventy}')        

In [66]:
getStats(infected)

Women: 17608
males: 21756
seventeenAndUnder: 6772
eighteen: 1608
thirty: 1732
forty: 3415
fifty: 5523
sixty: 6586
seventy: 13728


In [67]:
getStats(notInfected)

Women: 2791
males: 4365
seventeenAndUnder: 1200
eighteen: 381
thirty: 354
forty: 747
fifty: 1148
sixty: 1254
seventy: 2072
