In [1]:
pip install great expectations

Note: you may need to restart the kernel to use updated packages.


In [2]:
import great_expectations as gx
import pandas as pd
import os
import sys
current_dir = os.getcwd()
sys.path.append(os.path.abspath(os.path.join(current_dir, '..')))

In [4]:
context = gx.get_context()
print(type(context).__name__)

EphemeralDataContext


In [5]:
print(dir(context))

['DOLLAR_SIGN_ESCAPE_STRING', 'GLOBAL_CONFIG_PATHS', '_ETC_CONF_DIR', '_ETC_CONF_FILE', '_ROOT_CONF_DIR', '_ROOT_CONF_FILE', '__abstractmethods__', '__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_abc_impl', '_add_datasource', '_add_fluent_datasource', '_attach_fluent_config_datasources_and_build_data_connectors', '_build_data_docs', '_build_store_from_config', '_checkpoints', '_clean_data_docs_site', '_config_provider', '_config_variables', '_construct_data_context_id', '_data_context_id', '_data_sources', '_datasource_store', '_datasources', '_delete_fluent_datasource', '_determine_analytics_enabled', '_get_batch_list_from_inputs', '_get_expectation_suite_from_

In [6]:
from influxdb_client import InfluxDBClient
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "token1"
INFLUX_ORG = "upa"
INFLUX_AQ_BUCKET = "air_quality"
INFLUX_INC_BUCKET = "gencat_incidents"



In [7]:
import great_expectations as ge

# Create an in-memory GX context
context = ge.get_context(mode="ephemeral")


In [8]:
def query_traffic():
    client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
    query = f'''
    from(bucket: "{INFLUX_INC_BUCKET}")
        |> range(start: -100y)
        |> filter(fn: (r) => r._measurement == "traffic_incidents")
        |> filter(fn: (r) => r._field == "lat" or r._field == "lon" or r._field == "value")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''
    tables = client.query_api().query(query)
    results = []
    for table in tables:
        for record in table.records:
            results.append({
                "time": record.get_time(),
                # "field": record.get_field(),  # <-- remove this, no longer available after pivot
                "value": record.values.get("value"),
                "lat": record.values.get("lat"),
                "lon": record.values.get("lon"),
                "description": record.values.get("description"),
                "cause": record.values.get("cause"),
                "direction": record.values.get("direction"),
                "incident_id": record.values.get("incident_id")
            })
    client.close()
    return pd.DataFrame(results)

df_traffic = query_traffic()
print(df_traffic)


                       time value        lat       lon description  \
0 2025-05-18 18:57:53+00:00  None  41.729851  2.622254        None   
1 2025-05-09 13:12:16+00:00  None  41.567044  1.963011        None   
2 2025-05-16 13:46:20+00:00  None  41.158315  1.226274        None   
3 2025-05-18 18:56:43+00:00  None  41.438174  2.002154        None   
4 2025-05-18 15:13:49+00:00  None  41.413690  2.222754        None   
5 2025-05-15 05:00:00+00:00  None  42.209008  2.539497        None   
6 2025-05-16 13:45:17+00:00  None  41.158315  1.226274        None   
7 2025-05-13 10:50:31+00:00  None  40.718611  0.562525        None   
8 2025-05-15 05:00:00+00:00  None  42.204180  2.704577        None   
9 2025-05-16 12:58:49+00:00  None  41.085776  1.076262        None   

                                    cause       direction incident_id  
0                                  Avaria      Decreixent   140097801  
1                             Carril BICI  Ambdos sentits   135087104  
2  Carril en 

In [9]:

data_source_name = "traffic_source"
data_source = context.data_sources.add_pandas(name= data_source_name)
data_source = context.data_sources.get(data_source_name)
data_asset_name = "df_traffic"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_definition_name = "traffic_batch_def"
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)
batch_parameters = {"dataframe": df_traffic}
batch_definition = (
    context.data_sources.get(data_source_name)
    .get_asset(data_asset_name)
    .get_batch_definition(batch_definition_name)
)

try:
    batch_definition = data_asset.get_batch_definition(batch_definition_name)
except KeyError:
    batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)

batch = batch_definition.get_batch(batch_parameters=batch_parameters)



expectation_lat_notnull = ge.expectations.ExpectColumnValuesToNotBeNull(
    column="lat"
)
expectation_long_notnull = ge.expectations.ExpectColumnValuesToNotBeNull(
    column="long"
)
expectation_lat_values_between = ge.expectations.ExpectColumnValuesToBeBetween(
    column="lat", max_value=90, min_value=-90
)
expectation_long_values_between = ge.expectations.ExpectColumnValuesToBeBetween(
    column="long", max_value=180, min_value=-180
)
expectation_cause_notnull = ge.expectations.ExpectColumnValuesToNotBeNull(
    column="cause"
)
expectation_time_notnull = ge.expectations.ExpectColumnValuesToNotBeNull(
    column="time"
)

expectation_cause_type = ge.expectations.ExpectColumnValuesToBeOfType(
    column = "cause", type_="str"
)





In [10]:
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

In [11]:
validation_results=[]
validation_results.append(batch.validate(expectation_lat_notnull))
validation_results.append(batch.validate(expectation_long_notnull))
validation_results.append(batch.validate(expectation_lat_values_between))
validation_results.append(batch.validate(expectation_long_values_between))
validation_results.append(batch.validate(expectation_cause_notnull))
validation_results.append(batch.validate(expectation_time_notnull))
validation_results.append(batch.validate(expectation_cause_type))


for i, result in enumerate(validation_results, start=1):
    print(f"Expectation {i}: {result['expectation_config']}")
    print(f"Success: {result['success']}")
    print(f"Result: {result['result']}")
    print("=" * 50)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

Expectation 1: {
  "type": "expect_column_values_to_not_be_null",
  "kwargs": {
    "batch_id": "traffic_source-df_traffic",
    "column": "lat"
  },
  "meta": {}
}
Success: True
Result: {'element_count': 10, 'unexpected_count': 0, 'unexpected_percent': 0.0, 'partial_unexpected_list': [], 'partial_unexpected_counts': [], 'partial_unexpected_index_list': []}
Expectation 2: {
  "type": "expect_column_values_to_not_be_null",
  "kwargs": {
    "column": "long",
    "batch_id": "traffic_source-df_traffic"
  },
  "meta": {}
}
Success: False
Result: {}
Expectation 3: {
  "type": "expect_column_values_to_be_between",
  "kwargs": {
    "batch_id": "traffic_source-df_traffic",
    "column": "lat",
    "min_value": -90.0,
    "max_value": 90.0
  },
  "meta": {}
}
Success: True
Result: {'element_count': 10, 'unexpected_count': 0, 'unexpected_percent': 0.0, 'partial_unexpected_list': [], 'missing_count': 0, 'missing_percent': 0.0, 'unexpected_percent_total': 0.0, 'unexpected_percent_nonmissing': 0.