-
Notifications
You must be signed in to change notification settings - Fork 8
/
indicators.py
604 lines (502 loc) · 22.7 KB
/
indicators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Módulo 'indicators' de Pydatajson
Contiene los métodos para monitorear y generar indicadores de un catálogo o de
una red de catálogos.
"""
from __future__ import print_function, absolute_import
from __future__ import unicode_literals, with_statement
import json
import logging
import os
from collections import Counter
from datetime import datetime
from six import string_types
from pydatajson.helpers import fields_to_uppercase
from pydatajson.status_indicators_generator import StatusIndicatorsGenerator
from . import helpers
from . import readers
from .federation_indicators_generator import FederationIndicatorsGenerator
from .search import get_datasets, get_distributions
CENTRAL_CATALOG = "http://datos.gob.ar/data.json"
ABSOLUTE_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
CATALOG_FIELDS_PATH = os.path.join(ABSOLUTE_PROJECT_DIR, "fields")
logger = logging.getLogger('pydatajson')
def generate_indicators(catalog, validator=None, only_numeric=False):
return _generate_indicators(catalog, validator=validator,
only_numeric=only_numeric)[1]
def generate_numeric_indicators(catalog, validator=None):
return _generate_indicators(catalog, validator=validator,
only_numeric=True)[1]
def generate_catalogs_indicators(catalogs, central_catalog=None,
identifier_search=False,
broken_links=False,
validator=None,
verify_ssl=True,
url_check_timeout=1,
broken_links_threads=1):
"""Genera una lista de diccionarios con varios indicadores sobre
los catálogos provistos, tales como la cantidad de datasets válidos,
días desde su última fecha actualizada, entre otros.
Args:
catalogs (str o list): uno o más catalogos sobre los que se quiera
obtener indicadores
central_catalog (str): catálogo central sobre el cual comparar los
datasets subidos en la lista anterior. De no pasarse no se
generarán indicadores de federación de datasets.
Returns:
tuple: 2 elementos, el primero una lista de diccionarios con los
indicadores esperados, uno por catálogo pasado, y el segundo
un diccionario con indicadores a nivel global,
datos sobre la lista entera en general.
"""
assert isinstance(catalogs, string_types + (dict, list))
# Si se pasa un único catálogo, genero una lista que lo contenga
if isinstance(catalogs, string_types + (dict,)):
catalogs = [catalogs]
indicators_list = []
# Cuenta la cantidad de campos usados/recomendados a nivel global
fields = {}
catalogs_cant = 0
for catalog in catalogs:
try:
catalog = readers.read_catalog(catalog)
catalogs_cant += 1
except Exception as e:
msg = u'Error leyendo catálogo de la lista: {}'.format(str(e))
logger.warning(msg)
continue
fields_count, result = _generate_indicators(
catalog, validator=validator,
broken_links=broken_links, verify_ssl=verify_ssl,
url_check_timeout=url_check_timeout,
broken_links_threads=broken_links_threads)
if central_catalog:
result.update(_federation_indicators(
catalog, central_catalog, identifier_search=identifier_search))
if not indicators_list:
# La primera iteracion solo copio el primer resultado
network_indicators = result.copy()
else:
network_indicators = helpers.add_dicts(network_indicators,
result)
# Sumo a la cuenta total de campos usados/totales
fields = helpers.add_dicts(fields_count, fields)
result['title'] = catalog.get('title', 'no-title')
result['identifier'] = catalog.get('identifier', 'no-id')
indicators_list.append(result)
if not indicators_list:
# No se pudo leer ningún catálogo
return [], {}
# Indicadores de la red entera
network_indicators['catalogos_cant'] = catalogs_cant
# Genero los indicadores de la red entera,
_network_indicator_percentages(fields, network_indicators)
return indicators_list, network_indicators
def _generate_indicators(catalog, validator=None, only_numeric=False,
broken_links=False, verify_ssl=True,
url_check_timeout=1, broken_links_threads=1):
"""Genera los indicadores de un catálogo individual.
Args:
catalog (dict): diccionario de un data.json parseado
Returns:
dict: diccionario con los indicadores del catálogo provisto
"""
result = {}
# Obtengo summary para los indicadores del estado de los metadatos
result.update(_generate_status_indicators(catalog, validator=validator,
verify_ssl=verify_ssl))
# Genero indicadores relacionados con validacion de urls
if broken_links:
result.update(_generate_valid_urls_indicators(
catalog, validator=validator, verify_ssl=verify_ssl,
url_check_timeout=url_check_timeout,
threads_count=broken_links_threads))
# Genero los indicadores relacionados con fechas, y los agrego
result.update(
_generate_date_indicators(catalog, only_numeric=only_numeric))
# Agrego la cuenta de los formatos de las distribuciones
if not only_numeric:
if 'dataset' in catalog:
format_count = count_fields(get_distributions(catalog), 'format')
format_count = fields_to_uppercase(format_count)
type_count = count_fields(get_distributions(catalog), 'type')
license_count = count_fields(get_datasets(catalog), 'license')
else:
format_count = type_count = license_count = {}
result.update({
'distribuciones_formatos_cant': format_count,
'distribuciones_tipos_cant': type_count,
'datasets_licencias_cant': license_count,
})
# Agrego porcentaje de campos recomendados/optativos usados
fields_count = _count_required_and_optional_fields(catalog)
recomendados_pct = float(fields_count['recomendado']) / \
fields_count['total_recomendado']
optativos_pct = float(fields_count['optativo']) / \
fields_count['total_optativo']
result.update({
'campos_recomendados_pct': round(recomendados_pct, 4),
'campos_optativos_pct': round(optativos_pct, 4)
})
return fields_count, result
def _federation_indicators(catalog, central_catalog,
identifier_search=False):
"""Cuenta la cantidad de datasets incluídos tanto en la lista
'catalogs' como en el catálogo central, y genera indicadores a partir
de esa información.
Args:
catalog (dict): catálogo ya parseado
central_catalog (str o dict): ruta a catálogo central, o un dict
con el catálogo ya parseado
"""
result = {
'datasets_federados_cant': None,
'datasets_federados_pct': None,
'datasets_no_federados_cant': None,
'datasets_federados_eliminados_cant': None,
'distribuciones_federadas_cant': None,
'datasets_federados_eliminados': [],
'datasets_no_federados': [],
'datasets_federados': [],
}
try:
central_catalog = readers.read_catalog(central_catalog)
except Exception as e:
msg = u'Error leyendo el catálogo central: {}'.format(str(e))
logger.warning(msg)
return result
generator = FederationIndicatorsGenerator(central_catalog, catalog,
id_based=identifier_search)
result.update({
'datasets_federados_cant':
generator.datasets_federados_cant(),
'datasets_no_federados_cant':
generator.datasets_no_federados_cant(),
'datasets_federados_eliminados_cant':
generator.datasets_federados_eliminados_cant(),
'datasets_federados_eliminados':
generator.datasets_federados_eliminados(),
'datasets_no_federados':
generator.datasets_no_federados(),
'datasets_federados':
generator.datasets_federados(),
'datasets_federados_pct':
generator.datasets_federados_pct(),
'distribuciones_federadas_cant':
generator.distribuciones_federadas_cant()
})
return result
def _network_indicator_percentages(fields, network_indicators):
"""Encapsula el cálculo de indicadores de porcentaje (de errores,
de campos recomendados/optativos utilizados, de datasets actualizados)
sobre la red de nodos entera.
Args:
fields (dict): Diccionario con claves 'recomendado', 'optativo',
'total_recomendado', 'total_optativo', cada uno con valores
que representan la cantidad de c/u en la red de nodos entera.
network_indicators (dict): Diccionario de la red de nodos, con
las cantidades de datasets_meta_ok y datasets_(des)actualizados
calculados previamente. Se modificará este argumento con los
nuevos indicadores.
"""
# Los porcentuales no se pueden sumar, tienen que ser recalculados
percentages = {
'datasets_meta_ok_pct':
(network_indicators.get('datasets_meta_ok_cant'),
network_indicators.get('datasets_meta_error_cant')),
'datasets_actualizados_pct':
(network_indicators.get('datasets_actualizados_cant'),
network_indicators.get('datasets_desactualizados_cant')),
'datasets_federados_pct':
(network_indicators.get('datasets_federados_cant'),
network_indicators.get('datasets_no_federados_cant')),
'datasets_con_datos_pct':
(network_indicators.get('datasets_con_datos_cant'),
network_indicators.get('datasets_sin_datos_cant')),
'distribuciones_download_url_ok_pct':
(network_indicators.get('distribuciones_download_url_ok_cant'),
network_indicators.get('distribuciones_download_url_error_cant')),
}
for indicator in percentages:
pct = 0.00
partial = percentages[indicator][0] or 0
total = partial + (percentages[indicator][1] or 0)
# Evita division por 0
if total:
pct = float(partial) / total
network_indicators[indicator] = round(pct, 4)
# % de campos recomendados y optativos utilizados en el catálogo entero
if fields: # 'fields' puede estar vacío si ningún campo es válido
rec_pct = float(fields['recomendado']) / \
fields['total_recomendado']
opt_pct = float(fields['optativo']) / fields['total_optativo']
network_indicators.update({
'campos_recomendados_pct': round(rec_pct, 4),
'campos_optativos_pct': round(opt_pct, 4)
})
def _generate_status_indicators(catalog, validator=None, verify_ssl=True):
"""Genera indicadores básicos sobre el estado de un catálogo
Args:
catalog (dict): diccionario de un data.json parseado
Returns:
dict: indicadores básicos sobre el catálogo, tal como la cantidad
de datasets, distribuciones y número de errores
"""
result = {
'datasets_cant': None,
'distribuciones_cant': None,
'datasets_meta_ok_cant': None,
'datasets_meta_error_cant': None,
'datasets_meta_ok_pct': None,
'datasets_con_datos_cant': None,
'datasets_sin_datos_cant': None,
'datasets_con_datos_pct': None
}
try:
generator = StatusIndicatorsGenerator(catalog,
validator=validator,
verify_ssl=verify_ssl)
except Exception as e:
msg = u'Error generando resumen del catálogo {}: {}'.format(
catalog['title'], str(e))
logger.warning(msg)
return result
result.update({
'datasets_cant': generator.datasets_cant(),
'distribuciones_cant': generator.distribuciones_cant(),
'datasets_meta_ok_cant': generator.datasets_meta_ok_cant(),
'datasets_meta_error_cant': generator.datasets_meta_error_cant(),
'datasets_meta_ok_pct': generator.datasets_meta_ok_pct(),
'datasets_con_datos_cant': generator.datasets_con_datos_cant(),
'datasets_sin_datos_cant': generator.datasets_sin_datos_cant(),
'datasets_con_datos_pct': generator.datasets_con_datos_pct(),
})
return result
def _generate_date_indicators(catalog, tolerance=0.2, only_numeric=False):
"""Genera indicadores relacionados a las fechas de publicación
y actualización del catálogo pasado por parámetro. La evaluación de si
un catálogo se encuentra actualizado o no tiene un porcentaje de
tolerancia hasta que se lo considere como tal, dado por el parámetro
tolerance.
Args:
catalog (dict o str): path de un catálogo en formatos aceptados,
o un diccionario de python
tolerance (float): porcentaje de tolerancia hasta que se considere
un catálogo como desactualizado, por ejemplo un catálogo con
período de actualización de 10 días se lo considera como
desactualizado a partir de los 12 con una tolerancia del 20%.
También acepta valores negativos.
Returns:
dict: diccionario con indicadores
"""
result = {
'datasets_desactualizados_cant': None,
'datasets_actualizados_cant': None,
'datasets_actualizados_pct': None,
'catalogo_ultima_actualizacion_dias': None
}
if not only_numeric:
result.update({
'datasets_frecuencia_cant': {}
})
try:
dias_ultima_actualizacion =\
_days_from_last_update(catalog, "modified")
if not dias_ultima_actualizacion:
dias_ultima_actualizacion =\
_days_from_last_update(catalog, "issued")
result['catalogo_ultima_actualizacion_dias'] = \
dias_ultima_actualizacion
except Exception as e:
msg = u'Error generando indicadores de fecha del catálogo {}: {}'\
.format(catalog['title'], str(e))
logger.warning(msg)
return result
actualizados = 0
desactualizados = 0
periodicity_amount = {}
for dataset in catalog.get('dataset', []):
# Parseo la fecha de publicación, y la frecuencia de actualización
periodicity = dataset.get('accrualPeriodicity')
if not periodicity:
continue
# Si la periodicity es eventual, se considera como actualizado
if _eventual_periodicity(periodicity):
actualizados += 1
prev_periodicity = periodicity_amount.get('EVENTUAL', 0)
periodicity_amount['EVENTUAL'] = prev_periodicity + 1
continue
# dataset sin fecha de última actualización es desactualizado
if "modified" not in dataset:
desactualizados += 1
else:
# Calculo el período de días que puede pasar sin actualizarse
# Se parsea el período especificado por accrualPeriodicity,
# cumple con el estándar ISO 8601 para tiempos con repetición
try:
date = helpers.parse_date_string(dataset['modified'])
days_diff = float((datetime.now() - date).days)
interval = helpers.parse_repeating_time_interval(
periodicity) * \
(1 + tolerance)
except Exception as e:
msg = u'Error generando indicadores'\
u'de fecha del dataset {} en {}: {}'
msg.format(dataset['identifier'], catalog['title'], str(e))
logger.warning(msg)
# Asumo desactualizado
desactualizados += 1
continue
if days_diff < interval:
actualizados += 1
else:
desactualizados += 1
prev_periodicity = periodicity_amount.get(periodicity, 0)
periodicity_amount[periodicity] = prev_periodicity + 1
datasets_total = len(catalog.get('dataset', []))
actualizados_pct = 0
if datasets_total:
actualizados_pct = float(actualizados) / datasets_total
result.update({
'datasets_desactualizados_cant': desactualizados,
'datasets_actualizados_cant': actualizados,
'datasets_actualizados_pct': round(actualizados_pct, 4)
})
if not only_numeric:
result.update({
'datasets_frecuencia_cant': periodicity_amount
})
return result
def _days_from_last_update(catalog, date_field="modified"):
"""Calcula días desde la última actualización del catálogo.
Args:
catalog (dict): Un catálogo.
date_field (str): Campo de metadatos a utilizar para considerar
los días desde la última actualización del catálogo.
Returns:
int or None: Cantidad de días desde la última actualización del
catálogo o None, si no pudo ser calculada.
"""
# el "date_field" se busca primero a nivel catálogo, luego a nivel
# de cada dataset, y nos quedamos con el que sea más reciente
date_modified = catalog.get(date_field, None)
dias_ultima_actualizacion = None
# "date_field" a nivel de catálogo puede no ser obligatorio,
# si no está pasamos
if isinstance(date_modified, string_types):
date = helpers.parse_date_string(date_modified)
dias_ultima_actualizacion = (
datetime.now() - date).days if date else None
for dataset in catalog.get('dataset', []):
date = helpers.parse_date_string(dataset.get(date_field, ""))
days_diff = float((datetime.now() - date).days) if date else None
# Actualizo el indicador de días de actualización si corresponde
if not dias_ultima_actualizacion or \
(days_diff and days_diff < dias_ultima_actualizacion):
dias_ultima_actualizacion = days_diff
if dias_ultima_actualizacion:
return int(dias_ultima_actualizacion)
else:
return None
def _count_required_and_optional_fields(catalog):
"""Cuenta los campos obligatorios/recomendados/requeridos usados en
'catalog', junto con la cantidad máxima de dichos campos.
Args:
catalog (str o dict): path a un catálogo, o un dict de python que
contenga a un catálogo ya leído
Returns:
dict: diccionario con las claves 'recomendado', 'optativo',
'requerido', 'recomendado_total', 'optativo_total',
'requerido_total', con la cantidad como valores.
"""
catalog = readers.read_catalog(catalog)
# Archivo .json con el uso de cada campo. Lo cargamos a un dict
catalog_fields_path = os.path.join(CATALOG_FIELDS_PATH,
'fields.json')
with open(catalog_fields_path) as f:
catalog_fields = json.load(f)
# Armado recursivo del resultado
return _count_fields_recursive(catalog, catalog_fields)
def _count_fields_recursive(dataset, fields):
"""Cuenta la información de campos optativos/recomendados/requeridos
desde 'fields', y cuenta la ocurrencia de los mismos en 'dataset'.
Args:
dataset (dict): diccionario con claves a ser verificadas.
fields (dict): diccionario con los campos a verificar en dataset
como claves, y 'optativo', 'recomendado', o 'requerido' como
valores. Puede tener objetios anidados pero no arrays.
Returns:
dict: diccionario con las claves 'recomendado', 'optativo',
'requerido', 'recomendado_total', 'optativo_total',
'requerido_total', con la cantidad como valores.
"""
key_count = {
'recomendado': 0,
'optativo': 0,
'requerido': 0,
'total_optativo': 0,
'total_recomendado': 0,
'total_requerido': 0
}
for k, v in fields.items():
# Si la clave es un diccionario se implementa recursivamente el
# mismo algoritmo
if isinstance(v, dict):
# dataset[k] puede ser o un dict o una lista, ej 'dataset' es
# list, 'publisher' no. Si no es lista, lo metemos en una.
# Si no es ninguno de los dos, dataset[k] es inválido
# y se pasa un diccionario vacío para poder comparar
elements = dataset.get(k)
if not isinstance(elements, (list, dict)):
elements = [{}]
if isinstance(elements, dict):
elements = [dataset[k].copy()]
for element in elements:
# Llamada recursiva y suma del resultado al nuestro
result = _count_fields_recursive(element, v)
for key in result:
key_count[key] += result[key]
# Es un elemento normal (no iterable), se verifica si está en
# dataset o no. Se suma 1 siempre al total de su tipo
else:
# total_requerido, total_recomendado, o total_optativo
key_count['total_' + v] += 1
if k in dataset:
key_count[v] += 1
return key_count
def count_fields(targets, field):
"""Cuenta la cantidad de values en el key
especificado de una lista de diccionarios"""
return Counter([target.get(field) or 'None' for target in targets])
def _eventual_periodicity(periodicity):
return periodicity in ('eventual', 'EVENTUAL')
def _generate_valid_urls_indicators(catalog, validator=None, verify_ssl=True,
url_check_timeout=1, threads_count=1):
"""Genera indicadores sobre el estado de las urls de distribuciones
Args:
catalog (dict): diccionario de un data.json parseado
Returns:
dict: indicadores sobre las urls de las distribuciones del catálogo
"""
result = {}
try:
generator = \
StatusIndicatorsGenerator(
catalog, validator=validator, verify_ssl=verify_ssl,
url_check_timeout=url_check_timeout,
threads_count=threads_count)
except Exception as e:
msg = u'Error generando resumen del catálogo {}: {}'.format(
catalog['title'], str(e))
logger.warning(msg)
return result
result.update({
'distribuciones_download_url_ok_cant':
generator.distribuciones_download_url_ok_cant(),
'distribuciones_download_url_error_cant':
generator.distribuciones_download_url_error_cant(),
'distribuciones_download_url_ok_pct':
generator.distribuciones_download_url_ok_pct(),
})
return result