Skip to content

Commit

Permalink
Merge pull request #287 from datosgobar/282-validar-borken-url-parale…
Browse files Browse the repository at this point in the history
…lizando

282 validar borken url paralelizando
  • Loading branch information
AWolfsdorf committed Oct 8, 2019
2 parents b7abcc4 + 224e657 commit a4a7034
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 32 deletions.
2 changes: 2 additions & 0 deletions pydatajson/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
DEFAULT_TIMEZONE = "America/Buenos_Aires"

VALID_STATUS_CODES = [200, 203, 302]

CANT_THREADS_BROKEN_URL_VALIDATOR = 10
19 changes: 15 additions & 4 deletions pydatajson/status_indicators_generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pydatajson import threading_helper
from pydatajson import constants
from pydatajson.helpers import is_working_url
from pydatajson.readers import read_catalog
from pydatajson.reporting import generate_datasets_summary
Expand Down Expand Up @@ -50,11 +52,20 @@ def distribuciones_download_url_ok_pct(self):
round(float(self.distribuciones_download_url_ok_cant()) / total, 4)

def _validate_download_urls(self):
result = 0
async_results = []
for dataset in self.catalog.get('dataset', []):
for distribution in dataset.get('distribution', []):
valid, _ = is_working_url(distribution.get('downloadURL', ''))
result += valid
distribution_urls = \
[distribution.get('downloadURL', '')
for distribution in dataset.get('distribution', [])]
async_results += threading_helper\
.apply_threading(distribution_urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

result = 0
for res, _ in async_results:
result += res

# Guardo el resultado una vez calculado
self.download_url_ok = result
return result
Expand Down
11 changes: 11 additions & 0 deletions pydatajson/threading_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from multiprocessing.pool import ThreadPool


def apply_threading(l, function, cant_threads):
if cant_threads == 1:
return [function(x) for x in l]
pool = ThreadPool(processes=cant_threads)
results = pool.map(function, l)
pool.close()
pool.join()
return results
84 changes: 62 additions & 22 deletions pydatajson/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import platform
from collections import Counter

import requests

from pydatajson.constants import VALID_STATUS_CODES
from pydatajson import threading_helper
from pydatajson import constants
from pydatajson.helpers import is_working_url

try:
Expand Down Expand Up @@ -223,18 +222,38 @@ def _validate_landing_pages(self, catalog):
datasets = catalog.get('dataset')
datasets = filter(lambda x: x.get('landingPage'), datasets)

metadata = []
urls = []

for dataset_idx, dataset in enumerate(datasets):
dataset_title = dataset.get('title')
landing_page = dataset.get('landingPage')
metadata.append({
"dataset_idx": dataset_idx,
"dataset_title": dataset.get('title'),
"landing_page": dataset.get('landingPage'),
})
urls.append(dataset.get('landingPage'))

sync_res = threading_helper\
.apply_threading(urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

for i in range(len(sync_res)):
valid, status_code = sync_res[i]
act_metadata = metadata[i]
dataset_idx = act_metadata["dataset_idx"]
dataset_title = act_metadata["dataset_title"]
landing_page = act_metadata["landing_page"]

valid, status_code = is_working_url(landing_page)
if not valid:
yield ce.BrokenLandingPageError(dataset_idx, dataset_title,
landing_page, status_code)

def _validate_distributions_urls(self, catalog):
datasets = catalog.get('dataset')

metadata = []
urls = []
for dataset_idx, dataset in enumerate(datasets):
distributions = dataset.get('distribution')

Expand All @@ -243,22 +262,43 @@ def _validate_distributions_urls(self, catalog):
access_url = distribution.get('accessURL')
download_url = distribution.get('downloadURL')

access_url_is_valid, access_url_status_code = \
is_working_url(access_url)
download_url_is_valid, download_url_status_code = \
is_working_url(download_url)
if not access_url_is_valid:
yield ce.BrokenAccessUrlError(dataset_idx,
distribution_idx,
distribution_title,
access_url,
access_url_status_code)
if not download_url_is_valid:
yield ce.BrokenDownloadUrlError(dataset_idx,
distribution_idx,
distribution_title,
download_url,
download_url_status_code)
metadata.append({
"dataset_idx": dataset_idx,
"dist_idx": distribution_idx,
"dist_title": distribution_title
})
urls += [access_url, download_url]

sync_res = threading_helper\
.apply_threading(urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

for i in range(len(metadata)):
actual_metadata = metadata[i]
dataset_idx = actual_metadata["dataset_idx"]
distribution_idx = actual_metadata["dist_idx"]
distribution_title = actual_metadata["dist_title"]

k = i*2
access_url = urls[k]
download_url = urls[k+1]

access_url_is_valid, access_url_status_code = sync_res[k]
download_url_is_valid, download_url_status_code = sync_res[k+1]

if not access_url_is_valid:
yield ce.BrokenAccessUrlError(dataset_idx,
distribution_idx,
distribution_title,
access_url,
access_url_status_code)
if not download_url_is_valid:
yield ce.BrokenDownloadUrlError(dataset_idx,
distribution_idx,
distribution_title,
download_url,
download_url_status_code)


def is_valid_catalog(catalog, validator=None):
Expand Down
3 changes: 3 additions & 0 deletions tests/test_catalog_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from pydatajson.catalog_readme import generate_readme
from tests.support.decorators import RESULTS_DIR

import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join("tests", "cassetes"),
record_mode='once')
Expand Down
18 changes: 12 additions & 6 deletions tests/test_readers_and_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import os.path
import unittest
from tempfile import NamedTemporaryFile

import nose
import vcr

from tempfile import NamedTemporaryFile

from pydatajson import constants
from tests.support.factories.xlsx import CSV_TABLE, WRITE_XLSX_TABLE
from tests.support.factories.xlsx import READ_XLSX_TABLE

Expand All @@ -27,10 +28,15 @@
from tests import xl_methods
import openpyxl as pyxl

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join(
"tests", "cassetes", "readers_and_writers"),
record_mode='once')
import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(
path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join(
"tests", "cassetes", "readers_and_writers"),
record_mode='once'
)


class ReadersAndWritersTestCase(unittest.TestCase):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from unittest import TestCase


from pydatajson.threading_helper import apply_threading


class ThreadingTests(TestCase):

def test_threading(self):
elements = [1, 2, 3, 4]

def function(x):
return x ** 2

result = apply_threading(elements, function, 3)

self.assertEqual(result, [1, 4, 9, 16])

def test_broken_function(self):
elements = [1, 2, 3, 0]

def divide(x):
return 6/x
with self.assertRaises(ZeroDivisionError): # Es "sincrónico"!
apply_threading(elements, divide, 3)
3 changes: 3 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from .context import pydatajson
from .support.decorators import RESULTS_DIR

import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join("tests", "cassetes"),
record_mode='once')
Expand Down

0 comments on commit a4a7034

Please sign in to comment.