# This notebook is a collection of expectations

In [1]:
import datetime

import pandas as pd

import great_expectations as ge
import great_expectations.jupyter_ux
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.data_context.types.resource_identifiers import ExpectationSuiteIdentifier
from great_expectations.exceptions import DataContextError

context = ge.data_context.DataContext()


# Feel free to change the name of your suite here. Renaming this will not remove the other one.
expectation_suite_name = "one"
try:
    suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
    suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')

2022-12-03T19:17:07+0100 - INFO - Great Expectations logging enabled at 20 level by JupyterUX module.
Loaded ExpectationSuite "one" containing 11 expectations.


# Table Expectations

In [5]:
# Expectation to check if only 5 rows are present
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={"min_value": 5, "max_value": 5},
    meta = {"metric_name" : "table.row_count"},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_table_row_count_to_be_between", "meta": {"metric_name": "table.row_count"}, "kwargs": {"min_value": 5, "max_value": 5}}

In [6]:
# Expectation to check if the table columns match the expected columns
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_table_columns_to_match_set",
    kwargs={"column_set": [
      "destination_city",
      "arrival_time",
      "days_left",
      "class",
      "source_city",
      "stops",
      "departure_time",
      "airline",
      "duration"
    ]
    },
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_table_columns_to_match_set", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column_set": ["destination_city", "arrival_time", "days_left", "class", "source_city", "stops", "departure_time", "airline", "duration"]}}

# Column Expectations

In [17]:
# Expectation to check if every column has no null values
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "destination_city"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "destination_city"}}

In [18]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "arrival_time"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "arrival_time"}}

In [19]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "days_left"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "days_left"}}

In [20]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "class"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "class"}}

In [21]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "source_city"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "source_city"}}

In [22]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "stops"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "stops"}}

In [23]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "departure_time"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "departure_time"}}

In [24]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "airline"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "airline"}}

In [25]:
expectation_configuration = ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "duration"},
    meta = {"profiler_details" : {"success_ratio" : 1.0,}},
    )
suite.add_expectation(expectation_configuration=expectation_configuration)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {"profiler_details": {"success_ratio": 1.0}}, "kwargs": {"column": "duration"}}

# Save Expectations

In [2]:
print(f'Added {len(suite.expectations)} expectations to ExpectationSuite "{suite.expectation_suite_name}".')

Added 11 expectations to ExpectationSuite "one".


In [3]:
context.save_expectation_suite(expectation_suite_name=expectation_suite_name, expectation_suite=suite)

suite_identifier = ExpectationSuiteIdentifier(expectation_suite_name=expectation_suite_name)
context.build_data_docs(resource_identifiers=[suite_identifier])
context.open_data_docs(resource_identifier=suite_identifier)

# Validate Expectations

In [2]:
import datetime

import pandas as pd

import great_expectations as ge
import great_expectations.jupyter_ux
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.exceptions import DataContextError

context = ge.data_context.DataContext()

# Feel free to change the name of your suite here. Renaming this will not remove the other one.
expectation_suite_name = "one"
try:
    suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
    suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')

batch_request = {'datasource_name': 'my_datasource', 'data_connector_name': 'default_inferred_data_connector_name', 'data_asset_name': '2022-46-03_15-46-38.csv', 'limit': 1000}

validator = context.get_validator(
    batch_request=BatchRequest(**batch_request),
    expectation_suite_name=expectation_suite_name
)

2022-12-04T17:36:18+0100 - INFO - Great Expectations logging enabled at 20 level by JupyterUX module.
Loaded ExpectationSuite "one" containing 11 expectations.


InvalidBatchRequestError: Validator could not be created because BatchRequest returned an empty batch_list.
                Please check your parameters and try again.

In [40]:
import os

for file in os.listdir('output_data'):
    batch_request = {'datasource_name': 'my_datasource', 'data_connector_name': 'default_inferred_data_connector_name', 'data_asset_name': file, 'limit': 1000}
    validator = context.get_validator(
        batch_request=BatchRequest(**batch_request),
        expectation_suite_name=expectation_suite_name
    )

    results = validator.validate()
    # Getting the success of the validation
    if results['success']:
        print(f"Validation of {file} was successful!")
    else:
        print(f"Validation of {file} failed.")
    

2022-12-03T22:13:43+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-46-03_15-46-38.csv was successful!
2022-12-03T22:13:44+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-46-03_15-46-55.csv was successful!
2022-12-03T22:13:45+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-47-03_15-47-03.csv was successful!
2022-12-03T22:13:46+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-47-03_15-47-28.csv was successful!
2022-12-03T22:13:46+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-47-03_15-47-41.csv was successful!
2022-12-03T22:13:47+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-47-03_15-47-49.csv was successful!
2022-12-03T22:13:48+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-48-03_15-48-11.csv was successful!
2022-12-03T22:13:49+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-48-03_15-48-25.csv was successful!
2022-12-03T22:13:50+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-48-03_15-48-39.csv was successful!
2022-12-03T22:13:50+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-48-03_15-48-47.csv was successful!
2022-12-03T22:13:51+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-49-03_15-49-11.csv was successful!
2022-12-03T22:13:52+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of 2022-49-03_15-49-27.csv was successful!
2022-12-03T22:13:53+0100 - INFO - 	11 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Validation of Flights50.csv failed.


In [1]:
import logging
from datetime import datetime
from datetime import timedelta
import warnings
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import great_expectations as ge
from great_expectations.core.batch import BatchRequest
warnings.filterwarnings("ignore")


def get_data_to_ingest_from_local_file() -> pd.DataFrame:
    nb_rows = 5
    filepath = 'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/input_data/Flights50.csv'
    input_data_df = pd.read_csv(filepath)
    #logging.info(f'Extract {nb_rows} from the file {filepath}')
    data_to_ingest_df = input_data_df.sample(n=nb_rows)
    return data_to_ingest_df

def save_data(data_to_ingest_df: pd.DataFrame) -> None:
    file = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".csv"
    filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_A/{file}'
    data_to_ingest_df.to_csv(filepath, index=False)
    print(file)

    if validate_data(file):
        filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_C/{file}'
        data_to_ingest_df.to_csv(filepath, index=False)
        print("Data saved in Folder C")
    else:
        filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_B/{file}'
        data_to_ingest_df.to_csv(filepath, index=False)
        print("Data saved in Folder B")

def validate_data(file):
    context = ge.data_context.DataContext()
    expectation_suite_name = "one"
    batch_request = {
        'datasource_name': 'my_datasource',
        'data_connector_name': 'default_inferred_data_connector_name',
        'data_asset_name': file,
        'limit': 1000}
    validator = context.get_validator(
        batch_request=BatchRequest(**batch_request),
        expectation_suite_name=expectation_suite_name
        )
    results = validator.validate()
       
    return results['success']

data_to_ingest = get_data_to_ingest_from_local_file()
save_data(data_to_ingest)



[[34m2022-12-05 14:23:32,178[0m] {[34mmetadatasource.py:[0m47} INFO[0m - Datasources: 1[0m
[[34m2022-12-05 14:23:32,178[0m] {[34mtype_lookup.py:[0m128} INFO[0m - Beginning TypeLookup transaction[0m
[[34m2022-12-05 14:23:32,178[0m] {[34msources.py:[0m134} INFO[0m - 2b. Registering `DataAsset` `TableAsset` as table[0m
[[34m2022-12-05 14:23:32,185[0m] {[34msources.py:[0m102} INFO[0m - 2a. Registering PostgresDatasource as postgres with add_postgres() factory[0m
[[34m2022-12-05 14:23:32,187[0m] {[34msources.py:[0m113} INFO[0m - 'postgres' added to `type_lookup`[0m
[[34m2022-12-05 14:23:32,187[0m] {[34mtype_lookup.py:[0m139} INFO[0m - Transaction committing items[0m
[[34m2022-12-05 14:23:32,187[0m] {[34mtype_lookup.py:[0m140} INFO[0m - Completed TypeLookup transaction[0m
2022-12-05_14-23-37.csv
[[34m2022-12-05 14:23:37,814[0m] {[34mvalidator.py:[0m1498} INFO[0m - 	11 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Data saved in Folder C


In [2]:
import logging
from datetime import datetime
from datetime import timedelta
import warnings
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import great_expectations as ge
from great_expectations.core.batch import BatchRequest
warnings.filterwarnings("ignore")


def get_data_to_ingest_from_local_file() -> pd.DataFrame:
    nb_rows = 5
    filepath = 'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/input_data/Flights50.csv'
    input_data_df = pd.read_csv(filepath)
    #logging.info(f'Extract {nb_rows} from the file {filepath}')
    data_to_ingest_df = input_data_df.sample(n=nb_rows)
    return data_to_ingest_df

def save_data(data_to_ingest_df: pd.DataFrame) -> None:
    file = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".csv"
    filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_A/{file}'
    data_to_ingest_df.to_csv(filepath, index=False)
    print(file)
    validate_data(file, data_to_ingest_df)

def validate_data(file, data_to_ingest_df):
    context = ge.data_context.DataContext()
    expectation_suite_name = "one"
    batch_request = {
        'datasource_name': 'my_datasource',
        'data_connector_name': 'default_inferred_data_connector_name',
        'data_asset_name': file,
        'limit': 1000}
    validator = context.get_validator(
        batch_request=BatchRequest(**batch_request),
        expectation_suite_name=expectation_suite_name
        )
    results = validator.validate()
    
    if results['success'] == True:
        filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_C/{file}'
        data_to_ingest_df.to_csv(filepath, index=False)
        print("Data saved in Folder C")
    else:
        filepath = f'D:/EPITA/dsp/flight_v3/dsp-project-dsp-flight/airflow/folder_B/{file}'
        data_to_ingest_df.to_csv(filepath, index=False)
        print("Data saved in Folder B")

data_to_ingest = get_data_to_ingest_from_local_file()
save_data(data_to_ingest)

2022-12-05_14-24-03.csv
[[34m2022-12-05 14:24:04,274[0m] {[34mvalidator.py:[0m1498} INFO[0m - 	11 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/30 [00:00<?, ?it/s]

Data saved in Folder B
