# mesures_cat_seuil
***

Essayons de mettre en pratique la proposition de Thomas dans la PR 89<br>
https://github.com/dataforgoodfr/13_pollution_eau/pull/89


In [1]:
import duckdb
from pipelines.tasks.config.common import DUCKDB_FILE

import pandas as pd

pd.set_option("display.max_columns", None)  # show all cols
pd.set_option("display.max_colwidth", None)  # show full width of showing cols
pd.set_option(
    "display.expand_frame_repr", False
)  # print cols side by side as it's supposed to be

con = duckdb.connect(database=DUCKDB_FILE, read_only=True)


In [2]:
# Tables dispo dans la bdd
con.sql("SHOW TABLES").show()

┌─────────────────────────┐
│          name           │
│         varchar         │
├─────────────────────────┤
│ cog_communes            │
│ edc_communes            │
│ edc_prelevements        │
│ edc_resultats           │
│ laposte_communes        │
│ mapping_categories      │
│ stg_communes__cog       │
│ stg_communes__laposte   │
│ stg_edc__communes       │
│ stg_edc__prevelevements │
│ stg_edc__resultats      │
├─────────────────────────┤
│         11 rows         │
└─────────────────────────┘



In [3]:
nb_result_ref = con.sql(" SELECT COUNT(*) FROM edc_resultats").df()

In [4]:
query_prelevement_unique = """
WITH
prelevements_cdfirstreseauamont AS (
    SELECT DISTINCT
        referenceprel,
        dateprel,
        heureprel,
        conclusionprel,
        plvconformitebacterio,
        plvconformitechimique,
        plvconformitereferencebact,
        plvconformitereferencechim,
        (CASE
            WHEN cdreseauamont IS NULL THEN cdreseau
            WHEN cdreseauamont IS NOT NULL THEN cdreseauamont
        END) AS cdfirstreseauamont,
        TRY_STRPTIME(
            dateprel || ' ' || REPLACE(heureprel, 'h', ':'), '%Y-%m-%d %H:%M'
        ) AS datetimeprel
    FROM
        edc_prelevements
),

ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY referenceprel
            ORDER BY
                dateprel,
                heureprel
        ) AS row_num
    FROM
        prelevements_cdfirstreseauamont
)

SELECT * EXCLUDE (row_num)
FROM
    ranked
WHERE
    row_num = 1
"""

prelevement_unique = con.sql(query_prelevement_unique).df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

# Check avant jointure par cdreseau

In [6]:
sub_query = """
WITH
prelevement AS (
	SELECT
	  referenceprel,
	  cdfirstreseauamont AS cdreseau,
      dateprel,
	FROM   
      prelevement_unique
),

resultats AS (
    SELECT
      referenceprel,
      cdparametresiseeaux,
      valtraduite,
      limitequal,
      CAST(regexp_extract(REPLACE(limitequal, ',', '.'), '-?\d+(\.\d+)?') AS FLOAT) AS limitequal_float,
      regexp_extract(limitequal, '[a-zA-Zµg]+/?[a-zA-Z/L]+$') AS unite,
    FROM  
        edc_resultats 
    ),

mapping_categories_simple AS (
    SELECT
      cdparametresiseeaux  ,
      STRING_AGG(DISTINCT categorie) AS categorie,
    FROM 
      mapping_categories
    GROUP BY 
        cdparametresiseeaux
     HAVING 
        COUNT(DISTINCT categorie) = 1
),

mesures_cat AS (
    SELECT
        resultats.*,
        prelevement.dateprel,
        mapping_categories_simple.categorie
    FROM 
        resultats
    LEFT JOIN
    	prelevement
    ON
       resultats.referenceprel = prelevement.referenceprel
    LEFT JOIN 
        mapping_categories_simple
    ON
        resultats.cdparametresiseeaux = mapping_categories_simple.cdparametresiseeaux
    )
    
SELECT 
 COUNT(*)
FROM
 mesures_cat
"""

nb_result_sub_query = con.sql(sub_query).df()

if nb_result_sub_query.iloc[0].values == nb_result_ref.iloc[0].values:
    print("ok :)")
else:
    print("KO")
    print("diff: ", nb_result_sub_query.iloc[0].values - nb_result_ref.iloc[0].values)

  sub_query = """


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

ok :)


# Avec Join sur edc_communes et cog_communes

In [5]:
query = """
WITH
udi AS (
    SELECT
      inseecommune AS commune_code_insee,
      cdreseau,
    FROM
      edc_communes
),

cog AS (
    SELECT
      DEP AS code_departement,
      REG AS code_region,
      COM AS commune_code_insee,
    FROM 
      cog_communes
),

udi_cog AS (
    SELECT
      udi.commune_code_insee ,
      udi.cdreseau,
      cog.code_departement,
      cog.code_region
    FROM
      udi
    LEFT JOIN 
      cog
    ON 
      udi.commune_code_insee = cog.commune_code_insee
),

prelevement AS (
	SELECT
	  referenceprel,
	  cdfirstreseauamont AS cdreseau,
      dateprel,
	FROM   
      prelevement_unique
),

resultats AS (
    SELECT
      referenceprel,
      cdparametresiseeaux,
      valtraduite,
      limitequal,
      CAST(regexp_extract(REPLACE(limitequal, ',', '.'), '-?\d+(\.\d+)?') AS FLOAT) AS limitequal_float,
      regexp_extract(limitequal, '[a-zA-Zµg]+/?[a-zA-Z/L]+$') AS unite,
    FROM  
        edc_resultats 
    ),

mapping_categories_simple AS (
    SELECT
      cdparametresiseeaux  ,
      STRING_AGG(DISTINCT categorie) AS categorie,
    FROM 
      mapping_categories
    GROUP BY 
        cdparametresiseeaux
     HAVING 
        COUNT(DISTINCT categorie) = 1
),

mesures_cat AS (
    SELECT
        resultats.*,
        prelevement.dateprel,
        mapping_categories_simple.categorie
    FROM 
        resultats
    LEFT JOIN
    	prelevement
    ON
       resultats.referenceprel = prelevement.referenceprel
    LEFT JOIN
        udi_cog
    ON
    	udi_cog.cdreseau = prelevement.cdreseau
    LEFT JOIN 
        mapping_categories_simple
    ON
        resultats.cdparametresiseeaux = mapping_categories_simple.cdparametresiseeaux
    )
    
SELECT 
 COUNT(*)
FROM
 mesures_cat
"""

nb_result_1 = con.sql(query).df()

if nb_result_1.iloc[0].values == nb_result_ref.iloc[0].values:
    print("ok :)")
else:
    print("KO")
    print("diff: ", nb_result_1.iloc[0].values - nb_result_ref.iloc[0].values)
    

  query = """


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

KO
diff:  [581458710]


# ZOOM uid/cog

In [9]:
query_udi_cog = """
WITH
udi AS (
    SELECT
      inseecommune AS commune_code_insee,
      cdreseau,
    FROM
      edc_communes
),

cog AS (
    SELECT
      DEP AS code_departement,
      REG AS code_region,
      COM AS commune_code_insee,
    FROM 
      cog_communes
),

udi_cog AS (
    SELECT
      udi.commune_code_insee ,
      udi.cdreseau,
      cog.code_departement,
      cog.code_region
    FROM
      udi
    LEFT JOIN 
      cog
    ON 
      udi.commune_code_insee = cog.commune_code_insee
)
     
SELECT 
 cdreseau,
 COUNT(*)
FROM
 udi_cog
GROUP BY 1
"""

udi_cog_df = con.sql(query_udi_cog).df()
udi_cog_df   

Unnamed: 0,cdreseau,count_star()
0,088001572,15
1,088001427,5
2,088001577,30
3,088001605,95
4,088001432,5
...,...,...
23762,073008257,1
23763,073008249,1
23764,073000247,1
23765,074008002,1


In [11]:
query_test = """
SELECT
      inseecommune AS commune_code_insee,
      cdreseau,
    FROM
      edc_communes
   WHERE 
      cdreseau = '088001572'
"""
con.sql(query_test).df()

Unnamed: 0,commune_code_insee,cdreseau
0,88026,88001572
1,88273,88001572
2,88495,88001572
3,88026,88001572
4,88273,88001572
5,88495,88001572
6,88026,88001572
7,88273,88001572
8,88495,88001572
9,88026,88001572
