#### Next two cells pull data needed for the assignment from a Github repository.

In [0]:
import urllib 
urllib.request.urlretrieve("https://github.com/donghwakim/umbc_data_603/blob/main/lecture_7/data/procedure_files.zip?raw=true", "/tmp/procedure_files.zip")

In [0]:
%sh
mkdir -p /tmp/lecture7_data
unzip /tmp/procedure_files.zip -d /tmp/lecture7_data

#### NOTE: The following cmd cell can run for a while. Take a coffee break and come back.

In [0]:
dbutils.fs.mv("file:/tmp/lecture7_data/files/", "/FileStore/tables/procedures/", True)  

##### Listing extracted files

In [0]:
dbutils.fs.ls('/FileStore/tables/procedures')

##### Inspecting the content of one of the files

In [0]:
dbutils.fs.head('/FileStore/tables/procedures/procedures_1327.csv')

#### For the assignment, complete following cells.

In [0]:
from pyspark.sql.types import *

inputPath = '/FileStore/tables/procedures/'

schema = StructType([StructField('DATE', TimestampType(), True ),
                    StructField('PATIENT', StringType(),True),
                    StructField('ENCOUNTER',StringType(),True),
                    StructField('CODE',StringType(),True ),
                    StructField('DESCRIPTION',StringType(),True),
                    StructField('BASE_COST',DoubleType(), True),
                    StructField('REASONCODE',StringType(),True),
                    StructField('REASONDESCRIPTION',StringType(),True)])

staticInputDF =  (spark.read.option("header", "true").schema(schema).csv(inputPath))

display(staticInputDF)

DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,BASE_COST,REASONCODE,REASONDESCRIPTION
2019-09-04T20:07:15.000+0000,d4bb4105-800d-cbc8-3600-030ae1db7b0b,e5e6ad7c-8574-f92b-1137-10e6fe7e2997,434363004,Human epidermal growth factor receptor 2 gene detection by fluorescence in situ hybridization (procedure),2048.64,254837009,Malignant neoplasm of breast (disorder)
2013-12-12T21:01:33.000+0000,729cda30-9d5e-1d6e-fce7-75a63efdf4ff,4b8e74cd-e938-57eb-b194-53a1bf19a6b8,434363004,Human epidermal growth factor receptor 2 gene detection by fluorescence in situ hybridization (procedure),2544.77,254837009,Malignant neoplasm of breast (disorder)
2019-09-05T20:07:15.000+0000,d4bb4105-800d-cbc8-3600-030ae1db7b0b,e5e6ad7c-8574-f92b-1137-10e6fe7e2997,433114000,Human epidermal growth factor receptor 2 gene detection by immunohistochemistry (procedure),264.34,254837009,Malignant neoplasm of breast (disorder)
2013-12-13T21:01:33.000+0000,729cda30-9d5e-1d6e-fce7-75a63efdf4ff,4b8e74cd-e938-57eb-b194-53a1bf19a6b8,433114000,Human epidermal growth factor receptor 2 gene detection by immunohistochemistry (procedure),285.11,254837009,Malignant neoplasm of breast (disorder)
2014-08-22T18:25:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,711c0d44-670b-808d-25c2-6dece2094876,90226004,Cytopathology procedure preparation of smear genital source (procedure),1787.08,254837009,Malignant neoplasm of breast (disorder)
2015-07-18T19:07:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,f67f27b2-a9b5-530e-0fd1-8bc91b36676d,90226004,Cytopathology procedure preparation of smear genital source (procedure),1397.8,254837009,Malignant neoplasm of breast (disorder)
2016-06-12T19:56:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,a5c08be7-bc0b-6012-6718-2a9dc0fc23a1,90226004,Cytopathology procedure preparation of smear genital source (procedure),2062.81,254837009,Malignant neoplasm of breast (disorder)
2017-06-07T20:33:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,025d07fc-6867-de3e-d733-ccadda5d58c1,90226004,Cytopathology procedure preparation of smear genital source (procedure),2484.46,254837009,Malignant neoplasm of breast (disorder)
2018-05-03T21:17:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,2f051a2e-7273-53e6-082a-598f887a2183,90226004,Cytopathology procedure preparation of smear genital source (procedure),2217.13,254837009,Malignant neoplasm of breast (disorder)
2019-04-28T22:00:14.000+0000,d6751ac9-4e2e-98ec-2c4c-16096d5a6c96,c4a588bf-9221-ba24-f1e0-5e077e0b29f0,90226004,Cytopathology procedure preparation of smear genital source (procedure),2047.45,254837009,Malignant neoplasm of breast (disorder)


Now we can compute the number of "open" and "close" actions with one hour windows. To do this, we will group by the `action` column and 1 hour windows over the `time` column.

In [0]:
from pyspark.sql.functions import *

staticCountsDF = (
  staticInputDF
      .groupBy(
       staticInputDF.PATIENT, 
       window(staticInputDF.DATE, "1 day"))    
       .count())
 
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [0]:
%sql select * from static_counts

PATIENT,window,count
4391aad9-685f-d670-338a-31f4fc5486bc,"List(1997-06-25T00:00:00.000+0000, 1997-06-26T00:00:00.000+0000)",8
7506c239-6262-efea-5675-9742e73e5afb,"List(2017-05-02T00:00:00.000+0000, 2017-05-03T00:00:00.000+0000)",5
58da0ec2-8979-132a-8f08-0140c5797b7b,"List(2020-03-13T00:00:00.000+0000, 2020-03-14T00:00:00.000+0000)",2
ca8b83dc-573f-3fa9-6f49-3443a490e420,"List(1978-08-11T00:00:00.000+0000, 1978-08-12T00:00:00.000+0000)",1
7a804ef4-6e99-e9ad-8349-487fa6c9135f,"List(2013-08-15T00:00:00.000+0000, 2013-08-16T00:00:00.000+0000)",1
f63bb0a0-d482-2e98-6aa9-efc17d4e5f70,"List(2020-10-11T00:00:00.000+0000, 2020-10-12T00:00:00.000+0000)",1
0f2b180c-6a8a-d698-6c87-ee93940b22b3,"List(1994-02-24T00:00:00.000+0000, 1994-02-25T00:00:00.000+0000)",1
4a91f400-df1d-bc36-ebfd-d08ef243c977,"List(2020-02-06T00:00:00.000+0000, 2020-02-07T00:00:00.000+0000)",1
4b5e7dc8-7301-6475-6d7a-67fba54fe302,"List(2015-08-11T00:00:00.000+0000, 2015-08-12T00:00:00.000+0000)",1
e5e170b9-0165-65ee-3f37-4105fa5a2d5c,"List(2015-06-11T00:00:00.000+0000, 2015-06-12T00:00:00.000+0000)",2


In [0]:
%sql select distinct PATIENT from static_counts 

PATIENT
7a804ef4-6e99-e9ad-8349-487fa6c9135f
7506c239-6262-efea-5675-9742e73e5afb
4a91f400-df1d-bc36-ebfd-d08ef243c977
4391aad9-685f-d670-338a-31f4fc5486bc
729cda30-9d5e-1d6e-fce7-75a63efdf4ff
58da0ec2-8979-132a-8f08-0140c5797b7b
4b5e7dc8-7301-6475-6d7a-67fba54fe302
ca8b83dc-573f-3fa9-6f49-3443a490e420
f63bb0a0-d482-2e98-6aa9-efc17d4e5f70
0f2b180c-6a8a-d698-6c87-ee93940b22b3


In [0]:
%sql select * from static_counts

PATIENT,window,count
4391aad9-685f-d670-338a-31f4fc5486bc,"List(1997-06-25T00:00:00.000+0000, 1997-06-26T00:00:00.000+0000)",8
7506c239-6262-efea-5675-9742e73e5afb,"List(2017-05-02T00:00:00.000+0000, 2017-05-03T00:00:00.000+0000)",5
58da0ec2-8979-132a-8f08-0140c5797b7b,"List(2020-03-13T00:00:00.000+0000, 2020-03-14T00:00:00.000+0000)",2
ca8b83dc-573f-3fa9-6f49-3443a490e420,"List(1978-08-11T00:00:00.000+0000, 1978-08-12T00:00:00.000+0000)",1
7a804ef4-6e99-e9ad-8349-487fa6c9135f,"List(2013-08-15T00:00:00.000+0000, 2013-08-16T00:00:00.000+0000)",1
f63bb0a0-d482-2e98-6aa9-efc17d4e5f70,"List(2020-10-11T00:00:00.000+0000, 2020-10-12T00:00:00.000+0000)",1
0f2b180c-6a8a-d698-6c87-ee93940b22b3,"List(1994-02-24T00:00:00.000+0000, 1994-02-25T00:00:00.000+0000)",1
4a91f400-df1d-bc36-ebfd-d08ef243c977,"List(2020-02-06T00:00:00.000+0000, 2020-02-07T00:00:00.000+0000)",1
4b5e7dc8-7301-6475-6d7a-67fba54fe302,"List(2015-08-11T00:00:00.000+0000, 2015-08-12T00:00:00.000+0000)",1
e5e170b9-0165-65ee-3f37-4105fa5a2d5c,"List(2015-06-11T00:00:00.000+0000, 2015-06-12T00:00:00.000+0000)",2


Now we can directly use SQL to query the table. For example, here are the total counts across all the hours.

In [0]:
%sql select PATIENT, sum(count) as total_count from static_counts group by PATIENT

PATIENT,total_count
7a804ef4-6e99-e9ad-8349-487fa6c9135f,7
7506c239-6262-efea-5675-9742e73e5afb,205
4a91f400-df1d-bc36-ebfd-d08ef243c977,157
4391aad9-685f-d670-338a-31f4fc5486bc,48
729cda30-9d5e-1d6e-fce7-75a63efdf4ff,30
58da0ec2-8979-132a-8f08-0140c5797b7b,41
4b5e7dc8-7301-6475-6d7a-67fba54fe302,33
ca8b83dc-573f-3fa9-6f49-3443a490e420,15
f63bb0a0-d482-2e98-6aa9-efc17d4e5f70,9
0f2b180c-6a8a-d698-6c87-ee93940b22b3,39


How about a timeline of windowed counts?

In [0]:
%sql select PATIENT, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, PATIENT

PATIENT,time,count
05f58d10-2633-4082-0ac9-c59a9783f8af,Apr-01 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Apr-02 00:00,1
3af2c8b4-3586-d2dd-2066-e3f132468324,Apr-02 00:00,1
5ee3eb82-bb38-dbc5-2e6f-896d5764eb5a,Apr-02 00:00,2
94e71c85-e56d-4497-a77b-21b83df21b3d,Apr-02 00:00,4
28aedb46-f550-d440-78af-47f898fe08af,Apr-03 00:00,1
4a7c2e59-383e-69e6-3c2b-1a5b804a9e52,Apr-03 00:00,1
7506c239-6262-efea-5675-9742e73e5afb,Apr-04 00:00,2
b9fef523-5bbe-e88b-b7d5-728f3184231a,Apr-04 00:00,1
f16ee8c8-c574-8dc6-7c5d-d44533d14288,Apr-04 00:00,1


## Stream Processing

In [0]:
from pyspark.sql.functions import *

streamingInputDF = (
  spark
    .readStream                       
    .schema(schema)
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)  
    .csv(inputPath)
)

streamingCountsDF = (                
  streamingInputDF
        .groupBy(
        streamingInputDF.PATIENT, 
        window(streamingInputDF.DATE, "1 day"))    
        .count())
                 


streamingCountsDF.isStreaming

In [0]:
streamingInputDF.isStreaming

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [0]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [0]:
%sql select PATIENT, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, PATIENT

PATIENT,time,count
3af2c8b4-3586-d2dd-2066-e3f132468324,Apr-02 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Apr-05 00:00,1
4f289f9e-2ee4-4445-d32a-c74cbf5dab28,Apr-08 00:00,1
4a91f400-df1d-bc36-ebfd-d08ef243c977,Apr-26 00:00,1
6ef30752-2155-899d-eee0-56b340930da7,Aug-01 00:00,1
d741dd3a-335e-0473-851f-0651e4388eaa,Aug-01 00:00,1
2fdf6859-6000-ea7b-aa2c-5a8c0e3f56d2,Aug-04 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Aug-04 00:00,1
5ee3eb82-bb38-dbc5-2e6f-896d5764eb5a,Aug-06 00:00,1
a7b685ae-2c26-d101-1b7e-eb52eec13583,Aug-09 00:00,1


In [0]:
sleep(5)  # wait a bit more for more data to be computed

In [0]:
%sql select PATIENT, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, PATIENT

PATIENT,time,count
3af2c8b4-3586-d2dd-2066-e3f132468324,Apr-02 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Apr-05 00:00,1
4f289f9e-2ee4-4445-d32a-c74cbf5dab28,Apr-08 00:00,1
4a91f400-df1d-bc36-ebfd-d08ef243c977,Apr-26 00:00,1
6ef30752-2155-899d-eee0-56b340930da7,Aug-01 00:00,1
d741dd3a-335e-0473-851f-0651e4388eaa,Aug-01 00:00,1
2fdf6859-6000-ea7b-aa2c-5a8c0e3f56d2,Aug-04 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Aug-04 00:00,1
5ee3eb82-bb38-dbc5-2e6f-896d5764eb5a,Aug-06 00:00,1
a7b685ae-2c26-d101-1b7e-eb52eec13583,Aug-09 00:00,1


In [0]:
sleep(5)  # wait a bit more for more data to be computed

In [0]:
%sql select PATIENT, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, PATIENT

PATIENT,time,count
3af2c8b4-3586-d2dd-2066-e3f132468324,Apr-02 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Apr-05 00:00,1
4f289f9e-2ee4-4445-d32a-c74cbf5dab28,Apr-08 00:00,1
4a91f400-df1d-bc36-ebfd-d08ef243c977,Apr-26 00:00,1
7506c239-6262-efea-5675-9742e73e5afb,Apr-27 00:00,1
6ef30752-2155-899d-eee0-56b340930da7,Aug-01 00:00,1
d741dd3a-335e-0473-851f-0651e4388eaa,Aug-01 00:00,1
2fdf6859-6000-ea7b-aa2c-5a8c0e3f56d2,Aug-04 00:00,1
3a1e0dad-75fc-94f9-814f-08880b6428e9,Aug-04 00:00,1
5ee3eb82-bb38-dbc5-2e6f-896d5764eb5a,Aug-06 00:00,1


In [0]:
%sql select PATIENT, sum(count) as total_count from counts group by PATIENT order by PATIENT

PATIENT,total_count
009f6a5c-23eb-384d-e187-8dc937e31c10,1
084cff93-29fb-c03e-027a-17dcc8110496,1
0a17e3f7-4db7-0d12-6c03-2c455f08378e,1
0f2b180c-6a8a-d698-6c87-ee93940b22b3,2
1993083b-d751-c0b7-152f-8df873a26084,1
1fa34f88-5b1f-250c-c281-7235eb25af1f,2
2b0df232-afa1-13f8-9bfb-539639c30d08,2
2fdf6859-6000-ea7b-aa2c-5a8c0e3f56d2,3
31eda4d1-ca03-197d-689d-c99940f32aa3,2
32294f40-4dc1-5c09-785c-51bb54112993,1
