In [1]:
import pandas as pd
import requests
import unittest
import logging
import gdown
import io

In [2]:
#!pip install --upgrade --no-cache-dir gdown

In [3]:
# Downloading the heart_failure_clinical_records.csv dataset from Google Drive

!gdown --id 1QK7YSRLMYiIAxvCh0fmlkl8AP3q8D0Tu

Downloading...
From: https://drive.google.com/uc?id=1QK7YSRLMYiIAxvCh0fmlkl8AP3q8D0Tu
To: C:\Users\dahal\heart_failure_clinical_records.csv

  0%|          | 0.00/12.2k [00:00<?, ?B/s]
100%|##########| 12.2k/12.2k [00:00<?, ?B/s]


In [4]:
# Configure logging

logging.basicConfig(level=logging.INFO)

In [5]:
# Extracting data from the heart failure clinical dataset

heart_failure_url = "https://drive.google.com/uc?id=1QK7YSRLMYiIAxvCh0fmlkl8AP3q8D0Tu"
heart_failure_dataset = pd.read_csv(heart_failure_url)
logging.info(f"Number of data items extracted from heart failure clinical dataset: {len(heart_failure_dataset)}")

dataset_field = list(heart_failure_dataset.columns)

nice_print = pd.DataFrame(dataset_field)
nice_print.rename(columns={0: 'Heart Failure Dataset Field Names'}, inplace=True)

logging.info("Field names of the Heart Failure Clinical Records dataset:")
logging.info(nice_print)

def extract_from_heart_failure_dataset():
    try:
        logging.info(f"Requesting data from the heart failure dataset: {heart_failure_url}")
        response = requests.get(heart_failure_url)
        response.raise_for_status()
        heart_failure_dataset = pd.read_csv(io.StringIO(response.text))
        logging.info(f"Number of data items extracted from heart failure dataset: {len(heart_failure_dataset)}")
        return heart_failure_dataset

    except requests.exceptions.RequestException as e:
        logging.error(f"Error extracting data from the heart failure dataset: {e}")
        return None


INFO:root:Number of data items extracted from heart failure clinical dataset: 299
INFO:root:Field names of the Heart Failure Clinical Records dataset:
INFO:root:   Heart Failure Dataset Field Names
0                                age
1                            anaemia
2           creatinine_phosphokinase
3                           diabetes
4                  ejection_fraction
5                high_blood_pressure
6                          platelets
7                   serum_creatinine
8                       serum_sodium
9                                sex
10                           smoking
11                              time
12                       DEATH_EVENT


In [6]:
print(dataset_field)

['age', 'anaemia', 'creatinine_phosphokinase', 'diabetes', 'ejection_fraction', 'high_blood_pressure', 'platelets', 'serum_creatinine', 'serum_sodium', 'sex', 'smoking', 'time', 'DEATH_EVENT']


In [7]:
def transform_data_for_mage_ai(raw_data):
    try:
        logging.info("Transforming data for Mage AI...")

        if raw_data is not None and not raw_data.empty:
            field_names = ['age', 'anaemia', 'creatinine_phosphokinase', 'diabetes', 'ejection_fraction',
                       'high_blood_pressure', 'platelets', 'serum_creatinine', 'serum_sodium',
                       'sex', 'smoking', 'time', 'DEATH_EVENT']

            transformed_records = [dict(zip(field_names, row)) for _, row in raw_data.iterrows()]

            logging.info("Transformed records:")

            for record in transformed_records[:100]:
                logging.info(record)
                
            return transformed_records
        else:
            logging.error("No raw data provided for transformation.")
            return None
    
    except Exception as e:
        logging.error(f"Error transforming data for Mage AI: {e}")
        return None


In [8]:
transformed_records = transform_data_for_mage_ai(heart_failure_dataset)

if transformed_records:
    logging.info(f"Sample of transformed data:\n{transformed_records[:100]}")

INFO:root:Transforming data for Mage AI...
INFO:root:Transformed records:
INFO:root:{'age': 75.0, 'anaemia': 0.0, 'creatinine_phosphokinase': 582.0, 'diabetes': 0.0, 'ejection_fraction': 20.0, 'high_blood_pressure': 1.0, 'platelets': 265000.0, 'serum_creatinine': 1.9, 'serum_sodium': 130.0, 'sex': 1.0, 'smoking': 0.0, 'time': 4.0, 'DEATH_EVENT': 1.0}
INFO:root:{'age': 55.0, 'anaemia': 0.0, 'creatinine_phosphokinase': 7861.0, 'diabetes': 0.0, 'ejection_fraction': 38.0, 'high_blood_pressure': 0.0, 'platelets': 263358.03, 'serum_creatinine': 1.1, 'serum_sodium': 136.0, 'sex': 1.0, 'smoking': 0.0, 'time': 6.0, 'DEATH_EVENT': 1.0}
INFO:root:{'age': 65.0, 'anaemia': 0.0, 'creatinine_phosphokinase': 146.0, 'diabetes': 0.0, 'ejection_fraction': 20.0, 'high_blood_pressure': 0.0, 'platelets': 162000.0, 'serum_creatinine': 1.3, 'serum_sodium': 129.0, 'sex': 1.0, 'smoking': 1.0, 'time': 7.0, 'DEATH_EVENT': 1.0}
INFO:root:{'age': 50.0, 'anaemia': 1.0, 'creatinine_phosphokinase': 111.0, 'diabetes': 

In [9]:
class TestHeartFailureETLPipeline(unittest.TestCase):

    def test_extraction(self):
        extracted_data = extract_from_heart_failure_dataset()
        self.assertIsNotNone(extracted_data)
        self.assertIsInstance(extracted_data, pd.DataFrame)

    def test_transformation(self):
        test_raw_data = extract_from_heart_failure_dataset().head(100)
        transformed_data = transform_data_for_mage_ai(test_raw_data)
        self.assertIsNotNone(transformed_data)
        self.assertIsInstance(transformed_data, list)
        self.assertEqual(len(transformed_data), 100)

# Run the defined unit tests for the Heart Failure ETL Pipeline using the TextTestRunner

unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestHeartFailureETLPipeline))


INFO:root:Requesting data from the heart failure dataset: https://drive.google.com/uc?id=1QK7YSRLMYiIAxvCh0fmlkl8AP3q8D0Tu
INFO:root:Number of data items extracted from heart failure dataset: 299
.INFO:root:Requesting data from the heart failure dataset: https://drive.google.com/uc?id=1QK7YSRLMYiIAxvCh0fmlkl8AP3q8D0Tu
INFO:root:Number of data items extracted from heart failure dataset: 299
INFO:root:Transforming data for Mage AI...
INFO:root:Transformed records:
INFO:root:{'age': 75.0, 'anaemia': 0.0, 'creatinine_phosphokinase': 582.0, 'diabetes': 0.0, 'ejection_fraction': 20.0, 'high_blood_pressure': 1.0, 'platelets': 265000.0, 'serum_creatinine': 1.9, 'serum_sodium': 130.0, 'sex': 1.0, 'smoking': 0.0, 'time': 4.0, 'DEATH_EVENT': 1.0}
INFO:root:{'age': 55.0, 'anaemia': 0.0, 'creatinine_phosphokinase': 7861.0, 'diabetes': 0.0, 'ejection_fraction': 38.0, 'high_blood_pressure': 0.0, 'platelets': 263358.03, 'serum_creatinine': 1.1, 'serum_sodium': 136.0, 'sex': 1.0, 'smoking': 0.0, 'time'

<unittest.runner.TextTestResult run=2 errors=0 failures=0>