## 📋 Task Index To implement data expectations




1. **Database Connection in the Notebook** 🔗  
   - Connect the Jupyter notebook to the PostgreSQL database to access data from the tables (`CardioTrain`, `CardioTrainNormalize`, `CauseOfDeaths`).
   - Verify the connection and data loading.

2. **Data Exploration (EDA)** 🔍  
   - Perform a brief exploratory analysis of each table to understand data distribution and detect any possible anomalies or null values.
   - Identify key columns and ranges that could be useful for defining expectations.

3. **Initial Expectation Definition** 🛠️  
   - Create expectations for basic validations, such as:
     - Non-null values in critical columns.
     - Expected data types.
     - Value ranges (e.g., age range or blood pressure).
     - Relationships between columns (e.g., `ap_hi` should always be greater than `ap_lo` in `CardioTrain`).

4. **Great Expectations Setup** 🧰  
   - Set up the Great Expectations environment in the notebook and create an expectation suite for each table.
   - Test the expectations in the notebook to ensure they work with data extracted from the database.

5. **Export Expectation Suites** 📤  
   - Save the created expectation suites so they can be used in Airflow DAGs.

6. **Integration into Airflow** 🌐  
   - Modify validation nodes in Airflow (`validate_cardio`, `validate_deaths`, etc.) to use the exported expectation suites.

---


In [1]:
import sys
import os
import pandas as pd
import great_expectations as gx
import great_expectations.expectations as gxe
from dotenv import load_dotenv
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

In [2]:
load_dotenv()

''' Esta es la config para cuando no estas usando docker 
work_dir = os.getenv('WORK_DIR')
sys.path.append(work_dir)
'''
# Cambia temporalmente WORK_DIR dentro del notebook
os.environ['WORK_DIR'] = '/home/jovyan/work'
sys.path.append(os.getenv('WORK_DIR'))
sys.path.append(f"{os.getenv('WORK_DIR')}/src")
sys.path.append(f"{os.getenv('WORK_DIR')}/transform")
from src.database.dbconnection import getconnection

Using the SQLAlchemy library, connect to the database. If you encounter any issues, check that your .env file contains the correct environment variables and try again.

### 1. **Database Connection in the Notebook** 🔗  
   - Connect the Jupyter notebook to the PostgreSQL database to access data from the tables (`CardioTrain`, `CardioTrainNormalize`, `CauseOfDeaths`).
   - Verify the connection and data loading.

In [3]:
engine = getconnection()
Session = sessionmaker(bind=engine)
session = Session()

Conected successfully to database airflow!


In [4]:
# Inspect the database tables
inspector = inspect(engine)
tables = inspector.get_table_names()
print("Tables in the database:", tables)


Tables in the database: ['CardioTrain', 'CauseOfDeaths', 'GlucoseTypes', 'CholesterolTypes', 'CardioTrainNormalize']


In [5]:
# Load table to DataFrame function
def load_table_to_df(table_name, engine):
    try:
        df = pd.read_sql_table(table_name, con=engine)
        print(f"Data from {table_name} loaded successfully.")
        return df
    except SQLAlchemyError as e:
        print(f"Error loading {table_name}: {e}")
        return None

# Load tables into DataFrames
df_cardio = load_table_to_df("CardioTrain", engine)
df_deaths = load_table_to_df("CauseOfDeaths", engine)

# Load OWID data from CSV
owid_path = os.path.join(os.getenv('WORK_DIR'), 'data', 'owid.csv')
try:
    df_owid = pd.read_csv(owid_path)
    print("OWID data loaded successfully from owid.csv.")
except FileNotFoundError:
    print("File owid.csv not found in the data directory.")
    df_owid = None

# Ensure DataFrames are loaded with data
print("\nSample data from CardioTrain table:")
print(df_cardio.head())

print("\nSample data from CauseOfDeaths table:")
print(df_deaths.head())

if df_owid is not None:
    print("\nSample data from OWID file:")
    print(df_owid.head())

Data from CardioTrain loaded successfully.
Data from CauseOfDeaths loaded successfully.
OWID data loaded successfully from owid.csv.

Sample data from CardioTrain table:
   id    age  gender  height  weight  ap_hi  ap_lo  cholesterol  gluc  smoke  \
0   0  18393       2     168    62.0    110     80            1     1      0   
1   1  20228       1     156    85.0    140     90            3     1      0   
2   2  18857       1     165    64.0    130     70            3     1      0   
3   3  17623       2     169    82.0    150    100            1     1      0   
4   4  17474       1     156    56.0    100     60            1     1      0   

   alco  active  cardio  
0     0       1       0  
1     0       1       1  
2     0       0       1  
3     0       1       1  
4     0       0       0  

Sample data from CauseOfDeaths table:
   id      Country Code  Year  Cardiovascular  TotalDeaths
0   1  Afghanistan  AFG  1990           44899       147971
1   2  Afghanistan  AFG  1991       

### 2. **Data Exploration (EDA)** 🔍  
   - Perform a brief exploratory analysis of each table to understand data distribution and detect any possible anomalies or null values.
   - Identify key columns and ranges that could be useful for defining expectations.



In [6]:



print("Columnas de CardioTrain:", df_cardio.columns.tolist())
print("Columnas de CauseOfDeaths:", df_deaths.columns.tolist())
print("Columnas de OWID:", df_owid.columns.tolist())


print("\nDescripción estadística de CardioTrain:")
print(df_cardio.describe())

print("\nDescripción estadística de CauseOfDeaths:")
print(df_deaths.describe())

print("\nDescripción estadística de OWID data:")
print(df_owid.describe())


print("\nValores nulos en CardioTrain:")
print(df_cardio.isnull().sum())

print("\nValores nulos en CauseOfDeaths:")
print(df_deaths.isnull().sum())

print("\nValores nulos en OWID data:")
print(df_owid.isnull().sum())


Columnas de CardioTrain: ['id', 'age', 'gender', 'height', 'weight', 'ap_hi', 'ap_lo', 'cholesterol', 'gluc', 'smoke', 'alco', 'active', 'cardio']
Columnas de CauseOfDeaths: ['id', 'Country', 'Code', 'Year', 'Cardiovascular', 'TotalDeaths']
Columnas de OWID: ['Country', 'Code', 'Year', 'CardiovascularDeaths', 'nitrogen_oxide(NOx)', 'sulphur_dioxide(SO2)', 'carbon_monoxide(CO)', 'black_carbon(BC)', 'ammonia(NH3)', 'non_methane_volatile_organic_compounds', 'gdp_per_capita', 'obesity_prevalence_percentage', 'diabetes_prevalence_percentage', 'population', 'TotalDeaths']

Descripción estadística de CardioTrain:
                 id           age        gender        height        weight  \
count  70000.000000  70000.000000  70000.000000  70000.000000  70000.000000   
mean   49972.419900  19468.865814      1.349571    164.359229     74.205690   
std    28851.302323   2467.251667      0.476838      8.210126     14.395757   
min        0.000000  10798.000000      1.000000     55.000000     10.0

### 3. **Initial Expectation Definition** 🛠️  
   - Establish expectations for basic validations, including:
     - Non-null values in critical columns.
     - Expected data types.
     - Value ranges (e.g., age range or blood pressure).
     - Relationships between columns (e.g., `ap_hi` should always be greater than `ap_lo` in `CardioTrain`).

#### 1. CardioTrain 🩺
- **Column Order and Presence**:
  - The table should contain columns in the following order: `['id', 'age', 'gender', 'height', 'weight', 'ap_hi', 'ap_lo', 'cholesterol', 'gluc', 'smoke', 'alco', 'active', 'cardio']`.
  
- **Distribution and Value Range**:
  - `age`: values between `10798` and `23713`.
  - `gender`: limited to values `1` (female) and `2` (male).
  - `height`: range between `55` and `250`.
  - `weight`: values between `10` and `200`.
  - `ap_hi` (systolic pressure) and `ap_lo` (diastolic pressure): columns of interest due to some extreme values (minimums of `-150` and `-70` and maximums of `16020` and `11000`), potentially requiring data cleaning or stricter limits.
  - `cholesterol` and `gluc`: values categorized between `1` and `3`.
  - `smoke`, `alco`, `active`, `cardio`: should be binary values (`0` or `1`).

- **Null Values**:
  - No null values allowed in any column.

#### 2. CauseOfDeaths ⚰️
- **Column Order and Presence**:
  - The table should contain columns in the following order: `['id', 'Country', 'Code', 'Year', 'Cardiovascular', 'TotalDeaths']`.

- **Distribution and Value Range**:
  - `Year`: values within the range of `1990` to `2019`.
  - `Cardiovascular` and `TotalDeaths`: values should stay within observed ranges, especially with maximums (`4,584,273` for `Cardiovascular` and `10,442,560` for `TotalDeaths`) to capture unusually high values.

- **Null Values**:
  - No null values allowed in any column.

#### 3. OWID Data 🌍
- **Column Order and Presence**:
  - The table should contain columns in the following order: `['Country', 'Code', 'Year', 'CardiovascularDeaths', 'nitrogen_oxide(NOx)', 'sulphur_dioxide(SO2)', 'carbon_monoxide(CO)', 'black_carbon(BC)', 'ammonia(NH3)', 'non_methane_volatile_organic_compounds', 'gdp_per_capita', 'obesity_prevalence_percentage', 'diabetes_prevalence_percentage', 'population', 'TotalDeaths']`.

- **Distribution and Value Range**:
  - `Year`: between `1990` and `2019`.
  - `CardiovascularDeaths`: values within the range of `4` to `4,584,273`.
  - `gdp_per_capita`: reasonable values between `246.74` and `169,200.27`.
  - `obesity_prevalence_percentage`: between `0.3` and `61`.
  - `diabetes_prevalence_percentage`: values between `1.9` and `29.8`.
  - Emission columns (`nitrogen_oxide(NOx)`, `sulphur_dioxide(SO2)`, `carbon_monoxide(CO)`, etc.) should not contain negative values.

- **Null Values**:
  - Emission and economic indicator columns (`gdp_per_capita`, `obesity_prevalence_percentage`, `diabetes_prevalence_percentage`, `population`) may contain null values, so expectations for permissible null values might be necessary.

### 4. **Great Expectations Setup** 🧰  
   - Set up the Great Expectations environment in the notebook and create an expectation suite for each table.
   - Test the expectations in the notebook to ensure they work with data extracted from the database.

In [7]:
def validate_data(context, df, data_name, expectation_suite_name, expectations):
    # Asegura que el data_source 'pandas' se añade solo una vez
    try:
        data_source = context.data_sources.add_pandas("pandas")
    except gx.exceptions.DataContextError:
        data_source = context.data_sources.get("pandas")
    
    # Intenta crear el data_asset cada vez; elimina el chequeo previo para simplificar el flujo
    data_asset = data_source.add_dataframe_asset(name=data_name)
    
    # Verifica o crea la definición de batch
    batch_definition_name = f"batch definition {data_name}"
    try:
        batch_definition = data_asset.get_batch_definition(batch_definition_name)
    except KeyError:
        batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)

    # Crea el batch para la validación
    batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

    # Crea o resetea el Expectation Suite
    try:
        expectation_suite = context.suites.add(gx.ExpectationSuite(name=expectation_suite_name))
    except gx.exceptions.DataContextError:
        context.suites.delete(name=expectation_suite_name)
        expectation_suite = context.suites.add(gx.ExpectationSuite(name=expectation_suite_name))

    # Añade expectativas al suite
    for expectation in expectations:
        expectation_suite.add_expectation(expectation)

    # Valida el batch usando la Expectation Suite
    validation_result = batch.validate(expectation_suite)

    # Muestra resultados de validación
    print(f"Validation for '{data_name}' passed: {validation_result['success']}")
    for result in validation_result["results"]:
        print(result)


#### 2: **Create Expectation** 

##### 1. **Expectation Suites** `CardioTrain`

In [8]:
cardio_expectations = [
    gxe.ExpectTableColumnsToMatchOrderedList(
        column_list=[
            "id", "age", "gender", "height", "weight", "ap_hi", "ap_lo",
            "cholesterol", "gluc", "smoke", "alco", "active", "cardio"
        ]
    ),
    gxe.ExpectColumnValuesToBeBetween(column="age", min_value=10798, max_value=23713),
    gxe.ExpectColumnValuesToBeBetween(column="height", min_value=55, max_value=250),
    gxe.ExpectColumnValuesToBeBetween(column="weight", min_value=10, max_value=200),
    gxe.ExpectColumnValuesToBeOfType(column="id", type_="int"),
    gxe.ExpectColumnValuesToBeOfType(column="gender", type_="int"),
]


##### 2. **Expectation Suites**  `CauseOfDeaths`

In [9]:
cause_of_deaths_expectations = [
    gxe.ExpectTableColumnsToMatchOrderedList(
        column_list=['id', 'Country', 'Code', 'Year', 'Cardiovascular', 'TotalDeaths']
    ),
    gxe.ExpectColumnValuesToBeBetween(column="Year", min_value=1990, max_value=2019),
    gxe.ExpectColumnValuesToBeBetween(column="Cardiovascular", min_value=4 * 0.95, max_value=4584273 * 1.05),
    gxe.ExpectColumnValuesToBeBetween(column="TotalDeaths", min_value=7 * 0.95, max_value=10442560 * 1.05),
]


##### 3. **Expectation Suites**  `OWID`

In [10]:
owid_expectations = [
    gxe.ExpectTableColumnsToMatchOrderedList(
        column_list=[
            'Country', 'Code', 'Year', 'CardiovascularDeaths', 'nitrogen_oxide(NOx)', 'sulphur_dioxide(SO2)', 
            'carbon_monoxide(CO)', 'black_carbon(BC)', 'ammonia(NH3)', 'non_methane_volatile_organic_compounds', 
            'gdp_per_capita', 'obesity_prevalence_percentage', 'diabetes_prevalence_percentage', 'population', 'TotalDeaths'
        ]
    ),
    gxe.ExpectColumnValuesToBeBetween(column="Year", min_value=1990, max_value=2019),
    gxe.ExpectColumnValuesToBeBetween(column="CardiovascularDeaths", min_value=4 * 0.95, max_value=4584273 * 1.05),
    gxe.ExpectColumnValuesToBeBetween(column="gdp_per_capita", min_value=246.74 * 0.95, max_value=169200.27 * 1.05),
    gxe.ExpectColumnValuesToBeBetween(column="obesity_prevalence_percentage", min_value=0.3 * 0.95, max_value=61 * 1.05),
    gxe.ExpectColumnValuesToBeBetween(column="diabetes_prevalence_percentage", min_value=1.9 * 0.95, max_value=29.8 * 1.05),
]




### 5. **Validations** 📤  
   - Save the created expectation suites so they can be used in Airflow DAGs.

In [11]:
# Inicializar el contexto de Great Expectations
context = gx.get_context()

In [12]:
# Validación de CardioTrain
validate_data(context, df_cardio, "cardio data", "cardio_train_expectations", cardio_expectations)

Calculating Metrics:   0%|          | 0/25 [00:00<?, ?it/s]

Validation for 'cardio data' passed: True
{
  "success": true,
  "expectation_config": {
    "type": "expect_table_columns_to_match_ordered_list",
    "kwargs": {
      "batch_id": "pandas-cardio data",
      "column_list": [
        "id",
        "age",
        "gender",
        "height",
        "weight",
        "ap_hi",
        "ap_lo",
        "cholesterol",
        "gluc",
        "smoke",
        "alco",
        "active",
        "cardio"
      ]
    },
    "meta": {},
    "id": "dc9adfff-382c-4740-a138-4ea7f8f37784"
  },
  "result": {
    "observed_value": [
      "id",
      "age",
      "gender",
      "height",
      "weight",
      "ap_hi",
      "ap_lo",
      "cholesterol",
      "gluc",
      "smoke",
      "alco",
      "active",
      "cardio"
    ]
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}
{
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_t

In [13]:
# Validación de CauseOfDeaths
validate_data(context, df_deaths, "cause_of_deaths data", "cause_of_deaths_expectations", cause_of_deaths_expectations)

Calculating Metrics:   0%|          | 0/24 [00:00<?, ?it/s]

Validation for 'cause_of_deaths data' passed: True
{
  "success": true,
  "expectation_config": {
    "type": "expect_table_columns_to_match_ordered_list",
    "kwargs": {
      "batch_id": "pandas-cause_of_deaths data",
      "column_list": [
        "id",
        "Country",
        "Code",
        "Year",
        "Cardiovascular",
        "TotalDeaths"
      ]
    },
    "meta": {},
    "id": "5d52e951-18c0-4a2b-9890-d42015df6b5d"
  },
  "result": {
    "observed_value": [
      "id",
      "Country",
      "Code",
      "Year",
      "Cardiovascular",
      "TotalDeaths"
    ]
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}
{
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_to_be_between",
    "kwargs": {
      "batch_id": "pandas-cause_of_deaths data",
      "column": "Year",
      "min_value": 1990.0,
      "max_value": 2019.0
    },
    "meta": {},
    "id": 

In [14]:
# Validación del dataset de OWID usando la función `validate_data`
validate_data(context, df_owid, "owid data", "owid_expectations", owid_expectations)

Calculating Metrics:   0%|          | 0/38 [00:00<?, ?it/s]

Validation for 'owid data' passed: True
{
  "success": true,
  "expectation_config": {
    "type": "expect_table_columns_to_match_ordered_list",
    "kwargs": {
      "batch_id": "pandas-owid data",
      "column_list": [
        "Country",
        "Code",
        "Year",
        "CardiovascularDeaths",
        "nitrogen_oxide(NOx)",
        "sulphur_dioxide(SO2)",
        "carbon_monoxide(CO)",
        "black_carbon(BC)",
        "ammonia(NH3)",
        "non_methane_volatile_organic_compounds",
        "gdp_per_capita",
        "obesity_prevalence_percentage",
        "diabetes_prevalence_percentage",
        "population",
        "TotalDeaths"
      ]
    },
    "meta": {},
    "id": "7f44ae45-a8b3-4035-a00e-853cd490aa85"
  },
  "result": {
    "observed_value": [
      "Country",
      "Code",
      "Year",
      "CardiovascularDeaths",
      "nitrogen_oxide(NOx)",
      "sulphur_dioxide(SO2)",
      "carbon_monoxide(CO)",
      "black_carbon(BC)",
      "ammonia(NH3)",
      "non_m