# Pipeline de traitement des données (du DUMP à la base de données)

Permet d'extraire, transformer et charger les données issues du dump des fichiers json-ld (s3 MinIO) jusqu'à la base de donnée (postgres)

## Entrées
Il faut avoir accès au S3 avec les fichier et copier dans le notebook la ligne de connexion (ligne disponible dans votre profil SSP Cloud -> Profil -> Connexion au Stockage -> Python (S3FS)

Ce bucket s3 est disponible au sein du projet **projet-ecolab-action-qualite**.
## Sortie
Remplit une base de données POSTGRESQL dont les identifiants sont lu depuis un fichier .env

```
POSTGRES_URL=....
POSTGRES_DB=...
POSTGRES_USER=...
POSTGRES_PASSWORD=...
```
## Visualisation
La base de données est connectée à une instance de Superset. 

## Imports

In [1]:
import rdflib
import time
import pandas as pd
import os
import s3fs
import logging
import glob
import json

from dotenv import load_dotenv
from unidecode import unidecode
from utils.dcat_reader_ckan import CatalogReader, DatasetReader
from utils.utils import create_universe_pprn

load_dotenv()

True

## Lecture du dump

In [2]:
# Create filesystem object
S3_ENDPOINT_URL = 'https://minio.lab.sspcloud.fr'

# Enter S3 cmd copied form SSLab : 
fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': 'https://'+'minio.lab.sspcloud.fr'},key ='XSZR1R12PmSvxB7Ze0Me', secret ='fdZbItjxgmFwEtkfnsE9PRTdouRtz0MlooCiDhk8')
#######

datasets = []
catalogs = []
iteration = 0
BUCKET = "projet-ecolab-action-qualite"
FILE_KEY_S3 = "DUMP_INTEGRATION_JSON/integration_page_107_2023-10-14.json"
FILE_PATH_S3 = BUCKET + "/" + FILE_KEY_S3


print(len(fs.ls(BUCKET + '/DUMP_INTEGRATION_JSON')))
      
for graph in fs.ls(BUCKET + '/DUMP_INTEGRATION_JSON'):
    if '.json' in graph:
        with fs.open(graph, mode="rb") as file_in:
            read_data = file_in.read()
            page_datasets = DatasetReader(read_data).get_data()
            logging.debug("Fichier: ", graph, "- Nombre de datasets : ", len(datasets))
            datasets.append(
                page_datasets
            )
            iteration += 1

logging.debug("Nombre de pages traitées : ", iteration)
data = pd.concat(datasets)

display(data)

239


Unnamed: 0,dataset,title,description,modification,right_statement,themes,key_words,standard,creator,status,catalog
0,http://catalogue.geo-ide.developpement-durable...,Document d'urbanisme applicable par commune 20...,Ce jeu de données recense les documents d'urba...,2023-02-03,Aucun des articles de la loi ne peut être invo...,http://registre.data.developpement-durable.gou...,Aménagement Urbanisme/Zonages Planification pl...,,,,GéoIDE Catalogue
1,http://catalogue.geo-ide.developpement-durable...,Zone de bruit d’une carte de bruit stratégique...,Les zones de bruit sont des éléments géométriq...,2022-05-04,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,bruit données ouvertes Nuisance/Bruit environn...,,,Completed,GéoIDE Catalogue
2,http://catalogue.geo-ide.developpement-durable...,Zonage réglementaire du Plan de Prévention des...,PPR approuvé le 10 août 2021,2021-12-17,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,données ouvertes Risque/Zonages Risque naturel,,,,GéoIDE Catalogue
3,http://catalogue.geo-ide.developpement-durable...,Statut des numérisations des Documents d'Urban...,Pour chaque commune 2015 est précisé l'état de...,2023-02-03,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,urbanisme données ouvertes plu planification A...,,,,GéoIDE Catalogue
4,http://catalogue.geo-ide.developpement-durable...,Zones multi-aléas dans les cas du Plan de Prév...,Table non géométrique à utiliser pour décrire ...,2018-11-13,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,pprt risque technologique plan de prévention m...,,,,GéoIDE Catalogue
...,...,...,...,...,...,...,...,...,...,...,...
95,http://catalogue.geo-ide.developpement-durable...,Lot de données de PPRN inondation en Charente ...,Lot de données du plan de prévention des risqu...,,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,ppr inondations Risque/Zonages Risque technolo...,,,,GéoIDE Catalogue
96,http://catalogue.geo-ide.developpement-durable...,Zone exposée à un ou plusieurs aléas figurant ...,"Zone exposée à un ou plusieurs aléas, représen...",2021-02-19,Pas de restriction d'accès public selon INSPIR...,http://registre.data.developpement-durable.gou...,"données ouvertes Zones de gestion, de restrict...",,,On Going,GéoIDE Catalogue
97,http://catalogue.geo-ide.developpement-durable...,Périmètre des communes vosgiennes soumises à l...,Missionné par la DHUP (Bureau de la législatio...,2022-02-08,Aucun des articles de la loi ne peut être invo...,http://registre.data.developpement-durable.gou...,"Zones de gestion, de restriction ou de régleme...",,,On Going,GéoIDE Catalogue
98,http://catalogue.geo-ide.developpement-durable...,Zones réglementées (points) du PPRi MOSELLE CE...,Les plans de prévention des risques (PPR) cons...,2017-03-17,Aucun des articles de la loi ne peut être invo...,http://registre.data.developpement-durable.gou...,"moselle données ouvertes Zones de gestion, de ...",,,On Going,GéoIDE Catalogue


## Attribution des univers

In [3]:
def create_universe_pprn(row):
    """
    For a specific row, return if the element is considered to be a part of the 'PPRN' universe
    """
    for elem in ['title', 'description']:
        if 'pprn' in row[elem].lower():
            return 'PPRN'
        elif 'prevention des risques' in unidecode(row[elem]).lower():
            return 'PPRN'
    return None

In [4]:
data['univers'] = data.apply(create_universe_pprn, axis=1)

2357 PPRN avec filtre title = PPRN 

3985 PPRN en ajoutant le filtre title = 'prevention des risques' (independament des accents et majuscules) 

5122 PPRN en appliquant également ces filtre au champ 'description'

## Export vers POSTGRES

In [5]:
from sqlalchemy import create_engine
engine = create_engine(f"postgresql+psycopg2://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_URL']}:{os.environ['POSTGRES_PORT']}/{os.environ['POSTGRES_DB']}")

with engine.begin() as connection:
    data.to_sql(name='datasets', con=connection, if_exists='replace', index=False)

## Création du describe() en ligne

In [None]:
import sweetviz as sv

my_report = sv.analyze(data)
my_report.show_html() # Default arguments will generate to "SWEETVIZ_REPORT.html"

## Dump in s3 as parquet (beta)

In [11]:
data.to_parquet(f's3://{BUCKET}/DUMP_INTEGRATION_JSON/dump.parquet')

import boto3
s3 = boto3.client("s3",endpoint_url = 'https://'+'minio.lab.sspcloud.fr',
                  aws_access_key_id= 'CZXRRD5MEOM6KQJRZQL4', 
                  aws_secret_access_key= 'ZxPppMmnYMBR+u31RtFnBa3DtWy+y1s80zclUpQz', 
                  aws_session_token = 'eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3NLZXkiOiJDWlhSUkQ1TUVPTTZLUUpSWlFMNCIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sImF1ZCI6WyJtaW5pby1kYXRhbm9kZSIsIm9ueXhpYSIsImFjY291bnQiXSwiYXV0aF90aW1lIjoxNjk5NDMxMTcxLCJhenAiOiJvbnl4aWEiLCJlbWFpbCI6InF1ZW50aW4ubG9yaWRhbnRAbXVsdGkuY29vcCIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE3MDAyMTIzNDUsImZhbWlseV9uYW1lIjoiTG9yaWRhbnQiLCJnaXZlbl9uYW1lIjoiUXVlbnRpbiIsImdyb3VwcyI6WyJlY29sYWItYWN0aW9uLXF1YWxpdGUiXSwiaWF0IjoxNzAwMTI1OTQ1LCJpc3MiOiJodHRwczovL2F1dGgubGFiLnNzcGNsb3VkLmZyL2F1dGgvcmVhbG1zL3NzcGNsb3VkIiwianRpIjoiMjkxZGJhODItOWYyMi00ZjhjLTgyNzItMzUzODEyZmJiMGUwIiwibmFtZSI6IlF1ZW50aW4gTG9yaWRhbnQiLCJwb2xpY3kiOiJzdHNvbmx5IiwicHJlZmVycmVkX3VzZXJuYW1lIjoicXVlbnRpbmxvcmlkYW50IiwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iLCJkZWZhdWx0LXJvbGVzLXNzcGNsb3VkIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgcHJvZmlsZSBncm91cHMgZW1haWwiLCJzZXNzaW9uUG9saWN5IjoiZXlKV1pYSnphVzl1SWpvaU1qQXhNaTB4TUMweE55SXNJbE4wWVhSbGJXVnVkQ0k2VzNzaVJXWm1aV04wSWpvaVFXeHNiM2NpTENKQlkzUnBiMjRpT2xzaWN6TTZLaUpkTENKU1pYTnZkWEpqWlNJNld5SmhjbTQ2WVhkek9uTXpPam82Y0hKdmFtVjBMV1ZqYjJ4aFlpMWhZM1JwYjI0dGNYVmhiR2wwWlNJc0ltRnlianBoZDNNNmN6TTZPanB3Y205cVpYUXRaV052YkdGaUxXRmpkR2x2YmkxeGRXRnNhWFJsTHlvaVhYMHNleUpGWm1abFkzUWlPaUpCYkd4dmR5SXNJa0ZqZEdsdmJpSTZXeUp6TXpwTWFYTjBRblZqYTJWMElsMHNJbEpsYzI5MWNtTmxJanBiSW1GeWJqcGhkM002Y3pNNk9qb3FJbDBzSWtOdmJtUnBkR2x2YmlJNmV5SlRkSEpwYm1kTWFXdGxJanA3SW5Nek9uQnlaV1pwZUNJNkltUnBabVoxYzJsdmJpOHFJbjE5ZlN4N0lrVm1abVZqZENJNklrRnNiRzkzSWl3aVFXTjBhVzl1SWpwYkluTXpPa2RsZEU5aWFtVmpkQ0pkTENKU1pYTnZkWEpqWlNJNld5SmhjbTQ2WVhkek9uTXpPam82S2k5a2FXWm1kWE5wYjI0dktpSmRmVjE5Iiwic2Vzc2lvbl9zdGF0ZSI6ImNlZTBhOGYxLTA4NjMtNGQ5OC1hZDNiLWQ1MWQ4N2I4Zjg4YSIsInNpZCI6ImNlZTBhOGYxLTA4NjMtNGQ5OC1hZDNiLWQ1MWQ4N2I4Zjg4YSIsInN1YiI6Ijg1NjQzNjFmLWQ3MTctNDhhYy04YTA2LTdlZjY5ZTlmZTRiMCIsInR5cCI6IkJlYXJlciJ9.KV-DgYdDBOA6WYZuIj5S_772qIGXKJKz_qyOz_YWcpGRrbp2BVEcQ2EDUZvoxDHbWHQQGo3F8bxDf4nOADJhdg')
with open("dump.parquet", "rb") as f:
    s3.upload_fileobj(f, BUCKET, "dump.parquet")