In [1]:
#Installing packages
import pymongo
import pandas as pd
import json

In [2]:
!pip install pandas
!pip install sodapy

import pandas as pd
from sodapy import Socrata

# Unauthenticated client only works with public data sets. Note 'None'
# in place of application token, and no username or password:
client = Socrata("health.data.ny.gov", None)

# Example authenticated client (needed for non-public datasets):
# client = Socrata(health.data.ny.gov,
#                  MyAppToken,
#                  userame="user@example.com",
#                  password="AFakePassword")

# First 2000 results, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
results = client.get("jw46-jpb7",limit=200000)

results_df = pd.DataFrame.from_records(results)





In [3]:
len(results_df)

149793

In [4]:
df=results_df

## Data Cleaning

In [5]:
to_drop = ['total_staffed_acute_care',
 'total_staffed_acute_care_1',
 'total_staffed_icu_beds_1',
 'total_staffed_icu_beds_3',
 'patients_admitted_due_to_covid',
 'patients_admitted_not_due_to_covid',
 'total_new_admissions_reported', 'patients_positive_after'] # Unnecessary columns also with nearly half NaNs

df = df.drop(to_drop, axis = 1)

df = df[results_df['total_staffed_beds_currently'].notna()] # Select only non null rows for our key column
age_distribution = ['patients_age_lt1',
 'patients_age_1_4',
 'patients_age_5_19',
 'patients_age_20_44',
 'patients_age_45_54',
 'patients_age_55_64',
 'patients_age_65_74',
 'patients_age_75_84',
 'patients_age_greater_85','hospitalized_indicator'] # Hospitalized patients ages distribution columns and the indicator column

df = df.drop(age_distribution, axis = 1) # Dropped ages distribution columns

# Change date column from strings to datetime, add year month day columns
df['as_of_date'] = pd.to_datetime(df['as_of_date'])
df['year']= df['as_of_date'].dt.year
df['month']= df['as_of_date'].dt.month
df['day']= df['as_of_date'].dt.day

In [6]:
# Check numer of NA columns as of now
df.isna().sum()

as_of_date                          0
facility_pfi                        0
facility_name                       0
doh_region                          0
facility_county                     0
facility_network                    0
ny_forward_region                   0
patients_currently                 14
patients_newly_admitted           142
patients_discharged               146
patients_currently_in_icu         146
patients_currently_icu            147
patients_expired                  143
cumulative_covid_19_discharges      0
cumulative_covid_19_fatalities      0
total_staffed_beds                  0
total_staffed_beds_currently        0
total_staffed_icu_beds              4
total_staffed_icu_beds_2           10
year                                0
month                               0
day                                 0
dtype: int64

In [7]:
cleaned_df = df.dropna() # Drop all rows that has NaNs since these don't affect the entirety of the data that much
cleaned_df.isna().sum()

as_of_date                        0
facility_pfi                      0
facility_name                     0
doh_region                        0
facility_county                   0
facility_network                  0
ny_forward_region                 0
patients_currently                0
patients_newly_admitted           0
patients_discharged               0
patients_currently_in_icu         0
patients_currently_icu            0
patients_expired                  0
cumulative_covid_19_discharges    0
cumulative_covid_19_fatalities    0
total_staffed_beds                0
total_staffed_beds_currently      0
total_staffed_icu_beds            0
total_staffed_icu_beds_2          0
year                              0
month                             0
day                               0
dtype: int64

In [8]:
cleaned_df.head(10)

Unnamed: 0,as_of_date,facility_pfi,facility_name,doh_region,facility_county,facility_network,ny_forward_region,patients_currently,patients_newly_admitted,patients_discharged,...,patients_expired,cumulative_covid_19_discharges,cumulative_covid_19_fatalities,total_staffed_beds,total_staffed_beds_currently,total_staffed_icu_beds,total_staffed_icu_beds_2,year,month,day
0,2020-03-26,1,ALBANY MEDICAL CENTER HOSPITAL,CAPITAL DISTRICT REGIONAL OFFICE,ALBANY,ALBANY MEDICAL CENTER,CAPITAL REGION,12,1,1,...,0,6,0,748,236,60,36,2020,3,26
1,2020-03-26,5,ST PETERS HOSPITAL,CAPITAL DISTRICT REGIONAL OFFICE,ALBANY,ST. PETERS HEALTH PARTNERS,CAPITAL REGION,10,0,0,...,0,1,0,447,145,20,7,2020,3,26
2,2020-03-26,37,CUBA MEMORIAL HOSPITAL INC,WESTERN REGIONAL OFFICE,ALLEGANY,INDEPENDENT,WESTERN NEW YORK,0,0,0,...,0,1,0,20,16,0,0,2020,3,26
3,2020-03-26,39,MEMORIAL HOSP OF WM F AND GERTRUDE F JONES AKA...,WESTERN REGIONAL OFFICE,ALLEGANY,UNIVERSITY OF ROCHESTER MEDICAL CENTER,WESTERN NEW YORK,0,0,0,...,0,0,0,49,29,6,2,2020,3,26
4,2020-03-26,42,UNITED HEALTH SERVICES HOSPITALS INC - BINGHAM...,CENTRAL REGIONAL OFFICE,BROOME,"UNITED HEALTH SERVICES HOSPITALS, INC.",SOUTHERN TIER,0,0,0,...,0,0,0,143,59,8,6,2020,3,26
5,2020-03-26,43,OUR LADY OF LOURDES MEMORIAL HOSPITAL INC,CENTRAL REGIONAL OFFICE,BROOME,ASCENSION HEALTH,SOUTHERN TIER,3,0,1,...,0,1,2,132,63,10,7,2020,3,26
6,2020-03-26,58,"UNITED HEALTH SERVICES HOSPITAL, INC. - WILSON...",CENTRAL REGIONAL OFFICE,BROOME,"UNITED HEALTH SERVICES HOSPITALS, INC.",SOUTHERN TIER,3,0,0,...,0,0,0,211,98,30,4,2020,3,26
7,2020-03-26,66,OLEAN GENERAL HOSPITAL,WESTERN REGIONAL OFFICE,CATTARAUGUS,KALEIDA HEALTH,WESTERN NEW YORK,0,0,0,...,0,0,0,111,58,14,9,2020,3,26
8,2020-03-26,85,AUBURN MEMORIAL HOSPITAL,CENTRAL REGIONAL OFFICE,CAYUGA,INDEPENDENT,CENTRAL NEW YORK,0,0,0,...,0,1,0,86,43,10,8,2020,3,26
9,2020-03-26,98,"BROOKS-TLC HOSPITAL SYSTEM, INC. (DUNKIRK)",WESTERN REGIONAL OFFICE,CHAUTAUQUA,INDEPENDENT,WESTERN NEW YORK,0,0,0,...,0,0,0,38,31,6,4,2020,3,26


In [9]:
# Check how many unique facilities
unique_pfi = len(cleaned_df['facility_pfi'].unique())
unique_pfi

197

In [10]:
#Check how many rows remained after cleaning
len(cleaned_df)

80711

## MongoDB

In [11]:
data = cleaned_df.to_dict(orient='records')
len(data)

80711

In [12]:
#Connecting to mongodb
from pymongo import MongoClient
client = MongoClient('localhost',27017)
db = client.apan5400final

In [13]:
#Loading in into the data
collection = db.data

In [14]:
db.data.insert_many(data)

<pymongo.results.InsertManyResult at 0x7f93ca738cd0>

In [15]:
total_docs = collection.count_documents({})
total_docs

80711

In [16]:
#MongoDB test sample
query = {
    "facility_county": {
        "$regex": 'ALBANY'
    }    
}
cur = collection.find(query)

In [17]:
#Appending the result into the list
a  = []

for i in cur:
    a.append(i)

len(a)

840

In [18]:
#Visualizing the data
dat = pd.DataFrame(a)
dat = dat.iloc[:,1:61]
most_recent = dat[dat['as_of_date']==dat['as_of_date'].max()]


In [19]:
most_recent

Unnamed: 0,as_of_date,facility_pfi,facility_name,doh_region,facility_county,facility_network,ny_forward_region,patients_currently,patients_newly_admitted,patients_discharged,...,patients_expired,cumulative_covid_19_discharges,cumulative_covid_19_fatalities,total_staffed_beds,total_staffed_beds_currently,total_staffed_icu_beds,total_staffed_icu_beds_2,year,month,day
838,2021-05-19,1,ALBANY MEDICAL CENTER HOSPITAL,CAPITAL DISTRICT REGIONAL OFFICE,ALBANY,ALBANY MEDICAL CENTER,CAPITAL REGION,15,2,0,...,2,1423,245,762,94,120,14,2021,5,19
839,2021-05-19,5,ST PETERS HOSPITAL,CAPITAL DISTRICT REGIONAL OFFICE,ALBANY,ST. PETERS HEALTH PARTNERS,CAPITAL REGION,10,2,5,...,0,1404,229,450,79,28,5,2021,5,19


## Spark

In [20]:
#ApacheSpark
!pip install pyspark



In [21]:
#Connecting pyspark
from pyspark.sql.types import *
from datetime import datetime

from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

In [22]:
sc = SparkContext() 
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")
sc.stop()

sc = SparkContext(conf = config) 
sqlContext = SQLContext(sc)
    # print("Using Apache Spark Version", sc.version)



In [23]:
#Functions to convert pandas dataframe to spark dataframe

def equivalent_type(f):
      if f == 'datetime64[ns]': return TimestampType()
      elif f == 'int64': return LongType()
      elif f == 'int32': return IntegerType()
      elif f == 'float64': return FloatType()
      else: return StringType()
      
def define_structure(string, format_type):
      try: typo = equivalent_type(format_type)
      except: typo = StringType()
      return StructField(string, typo)
      
def pandas_to_spark(pandas_df):
      columns = list(pandas_df.columns)
      types = list(pandas_df.dtypes)
      struct_list = []
      for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
      p_schema = StructType(struct_list)
      return sqlContext.createDataFrame(pandas_df, p_schema)

In [24]:
spark_df = pandas_to_spark(most_recent)

In [25]:
spark_df.columns

['as_of_date',
 'facility_pfi',
 'facility_name',
 'doh_region',
 'facility_county',
 'facility_network',
 'ny_forward_region',
 'patients_currently',
 'patients_newly_admitted',
 'patients_discharged',
 'patients_currently_in_icu',
 'patients_currently_icu',
 'patients_expired',
 'cumulative_covid_19_discharges',
 'cumulative_covid_19_fatalities',
 'total_staffed_beds',
 'total_staffed_beds_currently',
 'total_staffed_icu_beds',
 'total_staffed_icu_beds_2',
 'year',
 'month',
 'day']

In [26]:
#test
spark_df.show()

+-------------------+------------+--------------------+--------------------+---------------+--------------------+-----------------+------------------+-----------------------+-------------------+-------------------------+----------------------+----------------+------------------------------+------------------------------+------------------+----------------------------+----------------------+------------------------+----+-----+---+
|         as_of_date|facility_pfi|       facility_name|          doh_region|facility_county|    facility_network|ny_forward_region|patients_currently|patients_newly_admitted|patients_discharged|patients_currently_in_icu|patients_currently_icu|patients_expired|cumulative_covid_19_discharges|cumulative_covid_19_fatalities|total_staffed_beds|total_staffed_beds_currently|total_staffed_icu_beds|total_staffed_icu_beds_2|year|month|day|
+-------------------+------------+--------------------+--------------------+---------------+--------------------+-----------------+-

In [27]:
#test
cb_rdd = spark_df.select('*').rdd.map(lambda row: [str(row[i]) for i in ['facility_county','as_of_date','doh_region','facility_network','ny_forward_region','total_staffed_beds_currently','total_staffed_icu_beds']])
cb_sdf2 = sqlContext.createDataFrame(cb_rdd,['facility_county','as_of_date','doh_region','facility_network','ny_forward_region','total_staffed_beds_currently','total_staffed_icu_beds'])

In [28]:
cb_sdf2.show()

+---------------+-------------------+--------------------+--------------------+-----------------+----------------------------+----------------------+
|facility_county|         as_of_date|          doh_region|    facility_network|ny_forward_region|total_staffed_beds_currently|total_staffed_icu_beds|
+---------------+-------------------+--------------------+--------------------+-----------------+----------------------------+----------------------+
|         ALBANY|2021-05-19 00:00:00|CAPITAL DISTRICT ...|ALBANY MEDICAL CE...|   CAPITAL REGION|                          94|                   120|
|         ALBANY|2021-05-19 00:00:00|CAPITAL DISTRICT ...|ST. PETERS HEALTH...|   CAPITAL REGION|                          79|                    28|
+---------------+-------------------+--------------------+--------------------+-----------------+----------------------------+----------------------+



## Flask

In [100]:
#!pip install flask
start_time = datetime.now()
from flask import Flask, request, render_template
app = Flask("ETL-Pipeline")

@app.route('/')
def my_form():
    return render_template('index.html')

@app.route('/', methods=['POST'])
def my_form_post():
    val = request.form['userinput']
    query = {
    "facility_county": {
        "$regex": val} }
    cur = collection.find(query)
    a  = []
    for i in cur:
        a.append(i)
    dat = pd.DataFrame(a)
    dat = dat.iloc[:,1:61]
    # Most recent records
    most_recent = dat[dat['as_of_date']==dat['as_of_date'].max()]

    spark_df = pandas_to_spark(most_recent)
    cb_rdd = spark_df.select('*').rdd.map(lambda row: [str(row[i]) for i in ['facility_county','as_of_date','doh_region','facility_network','ny_forward_region','total_staffed_beds_currently','total_staffed_icu_beds']])
    cb_sdf2 = sqlContext.createDataFrame(cb_rdd,['facility_county','as_of_date','doh_region','facility_network','ny_forward_region','total_staffed_beds_currently','total_staffed_icu_beds'])
    final_df = cb_sdf2.toPandas()

   
    return render_template('table.html',  tables=[final_df.to_html(classes='data')], titles='') #displaying results on the webpage
end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

Duration: 0:00:00.005179


In [102]:
app.run(host='localhost', port=5001)

 * Serving Flask app "ETL-Pipeline" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


INFO:werkzeug: * Running on http://localhost:5001/ (Press CTRL+C to quit)
INFO:werkzeug:127.0.0.1 - - [28/Apr/2022 01:02:42] "[37mGET / HTTP/1.1[0m" 200 -
INFO:werkzeug:127.0.0.1 - - [28/Apr/2022 01:02:45] "[37mPOST / HTTP/1.1[0m" 200 -
