In [None]:
#My actual submission is based on pandas due to the relatively small size of the data but 
#I know that Pyspark  is popular so i have prepared a pyspark alternate 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, udf, lag, lit
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window
import requests
from requests.exceptions import HTTPError
import logging
from datetime import datetime

def fetch_data():
    """Fetch data from the API and return as a list of dictionaries."""
    offset = 0
    batch_size = 1000
    continue_fetching = True
    all_data = []
    
    while continue_fetching:
        try:
            url = f"https://healthdata.gov/resource/g62h-syeh.json?$offset={offset}"
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()

            if not data:
                continue_fetching = False
            else:
                all_data.extend(data)
                offset += batch_size

        except HTTPError as http_err:
            logging.error(f"HTTP error occurred: {http_err}")
            raise
        except Exception as err:
            logging.error(f"Other error occurred: {err}")
            raise
    
    return all_data


# UDF for normalization with parrallelization
@udf(FloatType())
def normalize_data(value, min_value, max_value):
    return (value - min_value) / (max_value - min_value) if max_value > min_value else 0

def clean_process_pyspark():
    logging.basicConfig(level=logging.INFO)

    
    # Fetch data
    # Data fetching cannot be easily parrallelized so seperate it from the spark parts 
    data = fetch_data()
    spark = SparkSession.builder \
    .appName("Hire Aquil Data Processing") \
    .getOrCreate()
    df = spark.createDataFrame(data)

    keeping_cols = [
        'state', 'date', 'inpatient_beds', 'inpatient_beds_used', 'staffed_adult_icu_bed_occupancy',
        'inpatient_beds_used_covid', 'inpatient_beds_used_covid_coverage', 'critical_staffing_shortage_today_yes',
        'critical_staffing_shortage_today_no', 'hospital_onset_covid', 'previous_day_admission_adult_covid_confirmed',
        'total_staffed_adult_icu_beds', 'hospital_onset_covid_coverage', 'total_patients_hospitalized_confirmed_influenza_and_covid',
        'total_patients_hospitalized_confirmed_influenza', 'deaths_covid', 'previous_day_deaths_influenza'
    ]

    working_df= df.select(*keeping_cols)


    # Convert and clean data
    working_df = working_df.withColumn("date", to_date(col("date")))
    numeric_cols = [
        "inpatient_beds", "inpatient_beds_used", "staffed_adult_icu_bed_occupancy",
        "inpatient_beds_used_covid", "critical_staffing_shortage_today_yes",
        "critical_staffing_shortage_today_no", "hospital_onset_covid",
        "previous_day_admission_adult_covid_confirmed", "total_staffed_adult_icu_beds",
        "total_patients_hospitalized_confirmed_influenza_and_covid"
    ]
    
    for col_name in numeric_cols:
        working_df = working_df.withColumn(col_name, col(col_name).cast('float'))

    # Defining Window
    windowSpec = Window.partitionBy("state").orderBy("date")
    
    # Applying differencing
    working_df = working_df.withColumn("inpatient_beds_used_diff", 
                       col("inpatient_beds_used") - lag("inpatient_beds_used", 14).over(windowSpec))



    normalize_udf = udf(normalize_data, FloatType())
    
    # Apply normalization using UDF
    for col_name in numeric_cols:
        min_col = working_df.agg({col_name: 'min'}).collect()[0][0]
        max_col = working_df.agg({col_name: 'max'}).collect()[0][0]
        working_df = working_df.withColumn(f'normalized_{col_name}', normalize_udf(col(col_name), lit(min_col), lit(max_col)))
    
    # Show example data
    #working_df.show()

    # Stop Spark session
    spark.stop()

In [None]:
spark.stop()

In [None]:
clean_process_pyspark()

In [None]:
import pandas as pd
import requests
from requests.exceptions import HTTPError
from sklearn.preprocessing import StandardScaler
import logging
from datetime import datetime

def fetch_data_pandas():
    """Fetches healthcare data iteratively and combines into a single DataFrame then cleans it using Pandas.
        returns a pandas dataframe
    """
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    
    # Initialize DataFrame to accumulate the results
    raw_df = pd.DataFrame()
    
    # Initial offset and batch size
    offset = 0
    batch_size = 1000
    continue_fetching = True
    time_loaded = datetime.now()

    while continue_fetching:
        try:
            # Fetch data from API
            url = f"https://healthdata.gov/resource/g62h-syeh.json?$offset={offset}"
            logging.info(f"Retrieving data with offset {offset}")
            response = requests.get(url)
            response.raise_for_status()  # Raises HTTPError for bad responses

            data = response.json()

            # Check if data is empty to decide whether to continue
            if not data:
                continue_fetching = False
                logging.info("All data collected.")
            else:
                # create and append dataframe
                current_df = pd.DataFrame(data)
                raw_df = pd.concat([raw_df, current_df], ignore_index=True)

                # Increment the offset
                offset += batch_size

        except HTTPError as http_err:
            logging.error(f"HTTP error occurred: {http_err}")  # Log HTTP error
            raise
        except Exception as err:
            logging.error(f"Other error occurred: {err}")  # Log other errors that may occur
            raise

    # Data cleaning
    if not raw_df.empty:
        logging.info("Begin data processing")
        scaler = StandardScaler()
        
        # Define columns to keep
        keeping_cols = [
            'state', 'date', 'inpatient_beds', 'inpatient_beds_used', 'staffed_adult_icu_bed_occupancy',
            'inpatient_beds_used_covid', 'inpatient_beds_used_covid_coverage', 'critical_staffing_shortage_today_yes',
            'critical_staffing_shortage_today_no', 'hospital_onset_covid', 'previous_day_admission_adult_covid_confirmed',
            'total_staffed_adult_icu_beds', 'hospital_onset_covid_coverage', 'total_patients_hospitalized_confirmed_influenza_and_covid',
            'total_patients_hospitalized_confirmed_influenza', 'deaths_covid', 'previous_day_deaths_influenza'
        ]
        
        # Define colums for conversion
        numeric_cols = [
            'inpatient_beds', 'inpatient_beds_used', 'staffed_adult_icu_bed_occupancy', 'inpatient_beds_used_covid',
            'inpatient_beds_used_covid_coverage', 'critical_staffing_shortage_today_yes', 'critical_staffing_shortage_today_no',
            'hospital_onset_covid', 'previous_day_admission_adult_covid_confirmed', 'total_staffed_adult_icu_beds',
            'hospital_onset_covid_coverage', 'total_patients_hospitalized_confirmed_influenza_and_covid',
            'total_patients_hospitalized_confirmed_influenza', 'deaths_covid', 'previous_day_deaths_influenza'
        ]
        
        # Minimal cleaning and deleting cols also protects against new columns being added unexpectedly
        cleaned = raw_df[keeping_cols].dropna(how='all')

        # Convert fields
        cleaned['date'] = pd.to_datetime(cleaned['date'])
        cleaned[numeric_cols] = cleaned[numeric_cols].apply(pd.to_numeric, errors='coerce')

        # State-wise median filling and normalization
        cleaned[numeric_cols] = cleaned.groupby('state')[numeric_cols].transform(lambda x: x.fillna(x.median()))
        cleaned['normalized_inpatient_beds_used'] = cleaned.groupby('state')['inpatient_beds_used'].transform(
            lambda x: scaler.fit_transform(x.values.reshape(-1, 1)).flatten()
        )
        
        # Differencing for time series data
        cleaned.set_index('date', inplace=True)
        cleaned.sort_index(inplace=True)
        cleaned['inpatient_beds_used_diff'] = cleaned.groupby('state')['inpatient_beds_used'].diff(periods=14)        
        cleaned.reset_index(inplace=True)
        cleaned['time_loaded'] = time_loaded  # Adding time loaded for data traceability

        return cleaned
    else:
        logging.info("No data was collected.")
        return None

In [None]:
fetched=fetch_data_pandas()

In [None]:
#full airflow example
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
import requests
from requests.exceptions import HTTPError
from sklearn.preprocessing import StandardScaler
import sqlite3
import logging

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['aquil.codes@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
#Intitlaizing Dag and grabbing data on Wednesday and Friday when it is updated
@dag(default_args=default_args, schedule_interval='0 1 * * 3,5', start_date=days_ago(1), catchup=False)
def healthcare_data_processing_flow():
    
    @task
    def fetch_data():
        logging.basicConfig(level=logging.INFO)
        raw_df = pd.DataFrame()
        offset = 0
        batch_size = 1000
        continue_fetching = True
        api_url = "https://healthdata.gov/resource/g62h-syeh.json"
        
        while continue_fetching:
            try:
                url = f"{api_url}?$offset={offset}"
                logging.info(f"Retrieving data with offset {offset}")
                response = requests.get(url)
                response.raise_for_status()
                data = response.json()

                if not data:
                    continue_fetching = False
                    logging.info("All data collected.")
                else:
                    current_df = pd.DataFrame(data)
                    raw_df = pd.concat([raw_df, current_df], ignore_index=True)
                    offset += batch_size
            except HTTPError as http_err:
                logging.error(f"HTTP error occurred: {http_err}")
                raise
            except Exception as err:
                logging.error(f"Other error occurred: {err}")
                raise
        return raw_df

    @task
    def transform_data(raw_df):
        if raw_df.empty:
            logging.info("No data was collected.")
            return None

        logging.info("Begin data processing")
        time_transformed = datetime.now()
        scaler = StandardScaler()
        keeping_cols = [
            'state', 'date', 'inpatient_beds', 'inpatient_beds_used', 'staffed_adult_icu_bed_occupancy',
            'inpatient_beds_used_covid', 'inpatient_beds_used_covid_coverage', 'critical_staffing_shortage_today_yes',
            'critical_staffing_shortage_today_no', 'hospital_onset_covid', 'previous_day_admission_adult_covid_confirmed',
            'total_staffed_adult_icu_beds', 'hospital_onset_covid_coverage', 'total_patients_hospitalized_confirmed_influenza_and_covid',
            'total_patients_hospitalized_confirmed_influenza', 'deaths_covid', 'previous_day_deaths_influenza'
        ]
        
        # Define colums for conversion
        numeric_cols = [
            'inpatient_beds', 'inpatient_beds_used', 'staffed_adult_icu_bed_occupancy', 'inpatient_beds_used_covid',
            'inpatient_beds_used_covid_coverage', 'critical_staffing_shortage_today_yes', 'critical_staffing_shortage_today_no',
            'hospital_onset_covid', 'previous_day_admission_adult_covid_confirmed', 'total_staffed_adult_icu_beds',
            'hospital_onset_covid_coverage', 'total_patients_hospitalized_confirmed_influenza_and_covid',
            'total_patients_hospitalized_confirmed_influenza', 'deaths_covid', 'previous_day_deaths_influenza'
        ]
        
        # Minimal cleaning and deleting cols also protects against new columns being added unexpectedly
        cleaned = raw_df[keeping_cols].dropna(how='all')

        # Convert fields
        cleaned['date'] = pd.to_datetime(cleaned['date'])
        cleaned[numeric_cols] = cleaned[numeric_cols].apply(pd.to_numeric, errors='coerce')

        # State-wise median filling and normalization
        cleaned[numeric_cols] = cleaned.groupby('state')[numeric_cols].transform(lambda x: x.fillna(x.median()))
        cleaned['normalized_inpatient_beds_used'] = cleaned.groupby('state')['inpatient_beds_used'].transform(
            lambda x: scaler.fit_transform(x.values.reshape(-1, 1)).flatten()
        )
        
        # Differencing for time series data
        cleaned.set_index('date', inplace=True)
        cleaned.sort_index(inplace=True)
        cleaned['inpatient_beds_used_diff'] = cleaned.groupby('state')['inpatient_beds_used'].diff(periods=14)        
        cleaned.reset_index(inplace=True)
        cleaned['time_transformed'] = time_transformed  # Adding time loaded for data lineage


        return cleaned

    @task
    def load_to_db(cleaned_df):
        if cleaned_df is not None:
            conn = sqlite3.connect('health_data.db')
            cleaned_df.to_sql('healthcare_data', conn, if_exists='replace', index=False)
            conn.close()

    # Task dependencies
    raw_data = fetch_data()
    cleaned_data = transform_data(raw_data)
    load_to_db(cleaned_data)

dag = healthcare_data_processing_flow()
