# Consolidation IRVE

- récupération et consolidation de tous les CSV `irve` sur data.gouv.fr
- publication du fichier consolidé sur la plateforme

In [None]:
import os
import requests
import agate

from datetime import datetime
from pathlib import Path
import cchardet as chardet

current_path = Path(os.getenv('WORKING_DIR')) if os.getenv('WORKING_DIR') else Path().absolute()
current_path

## chargement de la configuration générale

In [None]:
domain = os.getenv('NB_IRVE_DOMAIN', 'next.data.gouv.fr')
domain

## récupération des fichiers sources

- liste de tous les datasets taggués "irve"
- téléchargement de toutes les ressources CSV associées (dans un dossier horodatée)

In [None]:
# get the list of datasets tagged irve on data.gouv.fr

url = 'https://%s/api/1/datasets/?tag=irve&page_size=1000' % domain
r = requests.get(url)
datasets = r.json()['data']

len(datasets)

In [None]:
# Download all tabular files in data/ directory, as best as we can

# make a new directory each DAY the download phase is done
data_path = current_path / 'data' / datetime.now().strftime('%Y%m%d')
data_path.mkdir(parents=True, exist_ok=True)

total_resources = 0
dl_resources = 0

our_dataset_id = os.environ['NB_IRVE_DATASET_ID']

downloaded = []
for d in datasets:
    if not d['organization'] and not d['owner']:
        print('❌', 'orphan dataset', d['slug'])
        continue
    # ignore our consolidated dataset
    if d['id'] == our_dataset_id:
        print('⚠️ ignored our own dataset')
        continue
    orga = d['organization']['slug'] if d['organization'] else d['owner']['slug']
    slug = d['slug']
    for r in d['resources']:
        total_resources += 1
        rurl = r['url']
        rid = r['id']
        # ODS style NB: won't work more than once for CKAN
        if 'format=csv' in rurl:
            filename = rurl.split('/')[-3] + '.csv'
        else:
            filename = rurl.split('/')[-1]
        ext = filename.split('.')[-1]
        if ext != 'csv':
            print('⚠️ ignored file %s' % rurl)
            continue
        r = requests.get(rurl, allow_redirects=True)
        p = Path(data_path) / slug
        p.mkdir(exist_ok=True)
        written_filename = '%s.%s' % (rid, ext) 
        with open('%s/%s' % (p, written_filename), 'wb') as f:
            dl_resources += 1
            f.write(r.content)
            downloaded.append(filename)
            print('✅ downloaded file [%s] %s' % (filename, rurl))
print('✅✅✅ Done', total_resources, dl_resources)

## création d'une liste de bornes "en vrac"

- récupération d'un maximum de lignes depuis les fichiers CSV téléchargés
- utilisation d'un mapping de colonnes pour exploiter des fichiers mal structurés ou mal encodés

In [None]:
import warnings
from agate.warns import UnnamedColumnWarning

def parse_csv(file_path):
    warnings.filterwarnings('ignore', category=UnnamedColumnWarning)
    # deactivate type testing, this puts too much constraint on parsing
    # especially for lat/lon columns with commas
    tester = agate.TypeTester(types=(agate.Text, ), limit=0)
    with file_path.open('rb') as f:
        encoding = chardet.detect(f.read()).get('encoding')
    try:
        table = agate.Table.from_csv(file_path, encoding=encoding, sniff_limit=None, column_types=tester)
    except Exception as e:
        print('❌ CSV parse error for %s (%s)' % (file_path, e))
    else:
        return table
    finally:
        warnings.resetwarnings()

In [None]:
columns_mapping = [
    ('n_amenageur', 'nom_amenageur', 'n_amenageu'), 
    ('n_operateur', 'n_operateu'), 
    ('n_enseigne', ), 
    ('id_station', ), 
    ('n_station', 'nom_station'), 
    ('ad_station', 'adresse_station'), 
    ('code_insee', ), 
    ('Xlongitude', 'longitude_wsg84'), 
    ('Ylatitude', 'latitude_wsg84'), 
    ('nbre_pdc', 'nbre_borne'), 
    ('id_pdc', 'n° borne'), 
    ('puiss_max', ), 
    ('type_prise', 'type_connecteur', 'typ_charge'), 
    ('acces_recharge', 'modalité d\'accès à la borne', 'acces_rech'), 
    ('accessibilité', 'accessibilitã©', 'accessibilite', 'accessibilit�', 'accessibilit‚'), 
    ('observations', ), 
    ('date_maj', ),
]
columns = ['n_amenageur', 'n_operateur', 'n_enseigne', 'id_station', 'n_station', 'ad_station', 'code_insee', 'Xlongitude', 'Ylatitude', 'nbre_pdc', 'id_pdc', 'puiss_max', 'type_prise', 'acces_recharge', 'accessibilité', 'observations', 'date_maj']
columns_low = [x.lower() for x in columns]

In [None]:
# use columns_mapping to build a database of unique stations based on (id_station, id_pdc, max(date_maj))
bornes = []

def find_by_pivot(row, lines):
    pass

def ifind_in_row_by_col(col, row):
    for key in row.keys():
        if col.lower() == key.lower():
            return row[key]

for child in [x for x in data_path.iterdir() if x.is_dir()]:
    csvs = list(child.glob('*.csv'))
    for csv in csvs:
        # print('ℹ️ doing %s' % csv)
        table = parse_csv(csv)
        if table:
            table_cols = [x.lower().strip() for x in table.column_names]
            missing_pivot = []
            for pivot in ['id_station', 'id_pdc', 'date_maj']:
                if pivot not in table_cols:
                    missing_pivot.append(pivot)
            if missing_pivot:
                continue
            for row in table.rows:
                borne = {}
                for col in columns_mapping:
                    for c in col:
                        if c.lower() in table_cols:
                            # use the standard name for column
                            borne[col[0]] = ifind_in_row_by_col(c, row)
                            break
                upatt = 'https://www.data.gouv.fr/fr/datasets/%s/#resource-%s'
                borne['source'] = upatt % (csv.parents[0].stem, csv.stem)
                bornes.append(borne)
                
print('✅ ✅ ✅')

In [None]:
len(bornes)

## consolidation

- dédoublonnage des données brutes
- "data hacks" : réparation de quelques erreurs communes

In [None]:
# deduplicate based on date_maj and id_pdc
from dateutil.parser import parse

unique_bornes = []

def parse_date(date):
    date = str(date)
    try:
        return parse(date)
    except ValueError:
        return parse('1970-1-1')

# filter out id_pdc==None
# TODO maybe use (id_station, id_pdc) instead of only id_pdc
ids = set([b['id_pdc'] for b in bornes if (b['id_pdc'] and str(b['id_pdc']).strip())])
for _id in ids:
    bs = [b for b in bornes if b['id_pdc'] == _id]
    if len(bs) > 1:
        max_date = max([parse_date(b['date_maj']) for b in bs])
        unique = [b for b in bs if parse_date(b['date_maj']) == max_date][0]
    else:
        unique = bs[0]
    ## Data hacks
    # replace `date_maj` original value with parsed date
    unique['date_maj'] = parse_date(unique['date_maj']).strftime('%Y/%m/%d')
    # clean `puiss_max`: dot instead of comma, remove unit
    unique['puiss_max'] = unique['puiss_max'].replace(',', '.').replace(' ', '').replace('kW', '') if unique['puiss_max'] else ''
    # clean `code_insee`: remove spaces, zero pad
    unique['code_insee'] = unique['code_insee'].replace(' ', '').zfill(5) if unique['code_insee'] else ''
    # replace `Xlongitude` and `Xlatitude` commas with points
    unique['Xlongitude'] = unique['Xlongitude'].replace(',', '.') if unique['Xlongitude'] else ''
    unique['Ylatitude'] = unique['Ylatitude'].replace(',', '.') if unique['Ylatitude'] else ''
    unique_bornes.append(unique)
    
len(unique_bornes)

In [None]:
len(bornes), len([b for b in bornes if b['id_pdc']]), len(unique_bornes)

## écriture des résultats dans un CSV

In [None]:
# write the results to CSV
import csv

# sort by n_amenageur
unique_bornes.sort(key=lambda k: k['n_amenageur'])

csvfile_name = 'bornes-irve-%s.csv' % datetime.now().strftime('%Y%m%d')
csvfile_path = current_path / 'output'
csvfile_path.mkdir(exist_ok=True)
csvfile_path = csvfile_path / csvfile_name

with open(csvfile_path, 'w') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=columns + ['source'], delimiter=';')
    writer.writeheader()
    for b in unique_bornes:
        writer.writerow(b)

## envoi du fichier sur (demo.)data.gouv.fr

- utilisation du CSV précédemment généré
- création d'une nouvelle ressource avec ce fichier
- mise à jour de la ressource "dernière version consolidée" avec la nouvelle URL
- tri des ressources pour avoir la "dernière version consolidée" en haut

### chargement de la configuration de l'upload

In [None]:
apikey = os.environ['NB_IRVE_APIKEY']
dataset_id = os.environ['NB_IRVE_DATASET_ID']
latest_resource_id = os.environ['NB_IRVE_LATEST_RESOURCE_ID']

### upload

In [None]:
# send the results to the portal
import requests
import locale

# date in French
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')

url = 'https://%s/api/1/datasets/%s' % (domain, dataset_id)

with open(csvfile_path, 'rb') as csvfile:
    files = {'file': (csvfile_name, csvfile.read())}

# upload file
r = requests.post('%s/upload/' % url, files=files, headers={'x-api-key': apikey})
assert r.status_code == 201

# update resource's metadata
resource = r.json()
resource['title'] = 'Version consolidée en %s' % datetime.now().strftime('%B %Y')
resource['schema'] = 'etalab/schema-irve'
resource['description'] = "Contenu correspondant au format défini par [l’arrêté du 12 janvier 2017](https://www.legifrance.gouv.fr/affichTexte.do?cidTexte=JORFTEXT000033860733&categorieLien=id), avec une colonne supplémentaire indiquant la source de donnée."
r = requests.put('%s/resources/%s/' % (url, resource['id']), json=resource, headers={'x-api-key': apikey})
assert r.status_code == 200
print(r.json())

# update latest resource URL
r = requests.get('%s/resources/%s/' % (url, latest_resource_id))
latest_resource = r.json()
latest_resource['url'] = resource['url']
latest_resource['filesize'] = resource['filesize']
r = requests.put('%s/resources/%s/' % (url, latest_resource_id), json=latest_resource, headers={'x-api-key': apikey})
assert r.status_code == 200

print('✅ ✅ ✅')

In [None]:
# reorder with latest resource on top
# "reorder" endpoint won't work from requests, so we're sending the entire dataset w/ reordered resources
r = requests.get('%s/' % url)
dataset = r.json()
resources = dataset['resources']
index = next((i for i, item in enumerate(resources) if item['id'] == latest_resource_id), -1)
resources.insert(0, resources.pop(index))
#dataset['resources'] = resources
r = requests.put('%s/' % url, json=dataset, headers={'x-api-key': apikey})
print(url)
assert r.status_code == 200

print('✅ ✅ ✅')