In [0]:
%pip install great_expectations


Chargement des zip

In [0]:
import requests
import zipfile
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import os

DefaultEndpointsProtocol = os.environ["DefaultEndpointsProtocol"]
AccountName = os.environ["AccountName"]
AccountKey = os.environ["AccountKey"]
EndpointSuffix = os.environ["EndpointSuffix"]

# 1. Définir les variables
urls = ["https://www.data.gouv.fr/api/1/datasets/r/6994a9f1-3f4b-4e15-a4dc-0e358a6aac13","https://www.data.gouv.fr/api/1/datasets/r/c0350599-a041-4724-9942-ad4c2ba9a7b3","https://www.data.gouv.fr/api/1/datasets/r/96452cf0-329a-4908-8adb-8f061adcca4c","https://www.data.gouv.fr/api/1/datasets/r/77d3151a-739e-4aab-8c34-7a15d7fea55d","https://www.data.gouv.fr/api/1/datasets/r/3c5ebbd9-f6b5-4837-a194-12bfeda7f38e"]

blob_connecting_string = f"DefaultEndpointsProtocol={DefaultEndpointsProtocol};AccountName={AccountName};AccountKey={AccountKey};EndpointSuffix={EndpointSuffix}"

container_name = "lnd-fichiers-source"

# 2. Envoyer dans ton Blob
blob_service_client = BlobServiceClient.from_connection_string(blob_connecting_string)

container_client = blob_service_client.get_container_client(container_name)

for annee, url in zip(["2025","2024","2023","2022","2021"],urls):
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Erreur lors du téléchargement du fichier : {response.status_code}")
    zip_bytes = BytesIO(response.content)
    zip_name = f"dis-{annee}-dept.zip"
    container_client.upload_blob(name=zip_name, data=zip_bytes, overwrite=True)

    print(f"{zip_name} envoyé dans le blob ✅")


Dézippage dans Azure

In [0]:
import zipfile

# Connexion à Azure
blob_service_client = BlobServiceClient.from_connection_string(blob_connecting_string)
source_container = blob_service_client.get_container_client("lnd-fichiers-source")
dest_container_name = "lnd-fichiers-dezip"

# Créer le conteneur de destination si nécessaire
if not blob_service_client.get_container_client(dest_container_name).exists():
    blob_service_client.create_container(dest_container_name)

dest_container = blob_service_client.get_container_client(dest_container_name)

# Lister les blobs ZIP
blobs_list = source_container.list_blobs()
for blob in blobs_list:
    if blob.name.endswith(".zip"):
        print(f"Décompression de {blob.name}...")

        # Télécharger le ZIP en mémoire
        blob_data = source_container.download_blob(blob.name).readall()
        zip_bytes = BytesIO(blob_data)

        # Dézipper
        with zipfile.ZipFile(zip_bytes) as z:
            for file_name in z.namelist():
                # Lire le fichier dans la mémoire
                file_data = z.read(file_name)

                # Envoyer le fichier dans le blob de destination
                dest_blob_name = file_name  # tu peux rajouter un préfixe si tu veux
                dest_container.upload_blob(name=dest_blob_name, data=file_data, overwrite=True)
                print(f"  - {file_name} envoyé dans {dest_container_name} ✅")


Récupération d'un fichier spécifique

In [0]:
import pandas as pd
from io import BytesIO

# Supposons que tu as déjà ton blob client
blob_name = "DIS_COM_UDI_2021.txt"
blob_data = dest_container.download_blob(blob_name).readall()  # récupère les octets

# Convertir les octets en flux pour pandas
df = pd.read_csv(BytesIO(blob_data), sep=",", encoding="utf-8")
df.head()



Initialisation de Great Expectation

In [0]:
import great_expectations as ge
from great_expectations.core.batch import RuntimeBatchRequest

# 1️⃣ Récupérer le contexte (local notebook)
context = ge.get_context()

# 2️⃣ Créer un datasource Pandas
data_source_name = "my_data_source"
data_source = context.data_sources.add_pandas(name=data_source_name)
data_asset_name = "my_dataframe_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)


Ajouter des attentes

In [0]:
from great_expectations import expectations as gxe

batch_definition = data_asset.add_batch_definition_whole_dataframe("batch_definition_unique")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

# Pas de valeurs nulles dans la colonne 'departement'
not_null_expectation = gxe.ExpectColumnValuesToNotBeNull(column="inseecommune")
print(not_null_expectation)

# Les inseecommune doivent être entre 0 et 97900
df["inseecommune"] = pd.to_numeric(df["inseecommune"], errors="coerce")

between_expectations = gxe.ExpectColumnMaxToBeBetween(column="inseecommune", min_value=0, max_value=97900)


Valider les attentes

In [0]:
validation_result1 = batch.validate(not_null_expectation)

print(validation_result1)


In [0]:
validation_result2 = batch.validate(between_expectations)

print(validation_result2)

Rapport de résultats

In [0]:
from datetime import datetime
import pandas as pd

# --- Simule tes résultats GE (tu peux remplacer par tes vraies variables si elles existent encore)
results = []

# Résultat 1 - not null
res1 = {
    "success": True,  # tu peux le récupérer directement depuis ton expectation_result.success
    "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {"column": "inseecommune"},
    },
    "result": {},
}
results.append(res1)

# Résultat 2 - bornes INSEE
res2 = {
    "success": True,
    "expectation_config": {
        "type": "expect_column_max_to_be_between",
        "kwargs": {
            "column": "inseecommune",
            "min_value": 0.0,
            "max_value": 97900.0,
        },
    },
    "result": {"observed_value": 97801.0},
}
results.append(res2)

# --- Transformation en tableau propre
records = []
for res in results:
    record = {
        "date_execution": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "nom_analyse": res["expectation_config"]["type"],
        "colonne": res["expectation_config"]["kwargs"].get("column"),
        "borne_min": res["expectation_config"]["kwargs"].get("min_value"),
        "borne_max": res["expectation_config"]["kwargs"].get("max_value"),
        "valeur_observee": res["result"].get("observed_value"),
        "resultat": "SUCCES" if res["success"] else "ECHEC",
    }
    records.append(record)

df_results = pd.DataFrame(records)
display(df_results)



Mise en table

In [0]:
# Convertir en Spark DF
spark_df = spark.createDataFrame(pd.DataFrame(records))

# Déclarer la Live Table
@dlt.table(
    name="data_quality_results",
    comment="Résultats des contrôles de qualité des données (Great Expectations)"
)
def data_quality_results():
    return (
        spark_df
        .withColumn("source", lit("qualite_eau"))
        .withColumn("date_insertion", current_timestamp())