Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

282 validar borken url paralelizando #287

Merged
merged 16 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pydatajson/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
DEFAULT_TIMEZONE = "America/Buenos_Aires"

VALID_STATUS_CODES = [200, 203, 302]

CANT_THREADS_BROKEN_URL_VALIDATOR = 10
19 changes: 15 additions & 4 deletions pydatajson/status_indicators_generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pydatajson import threading_helper
from pydatajson import constants
from pydatajson.helpers import is_working_url
from pydatajson.readers import read_catalog
from pydatajson.reporting import generate_datasets_summary
Expand Down Expand Up @@ -50,11 +52,20 @@ def distribuciones_download_url_ok_pct(self):
round(float(self.distribuciones_download_url_ok_cant()) / total, 4)

def _validate_download_urls(self):
result = 0
async_results = []
for dataset in self.catalog.get('dataset', []):
for distribution in dataset.get('distribution', []):
valid, _ = is_working_url(distribution.get('downloadURL', ''))
result += valid
distribution_urls = \
[distribution.get('downloadURL', '')
for distribution in dataset.get('distribution', [])]
async_results += threading_helper\
.apply_threading(distribution_urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

result = 0
for res, _ in async_results:
result += res

# Guardo el resultado una vez calculado
self.download_url_ok = result
return result
Expand Down
11 changes: 11 additions & 0 deletions pydatajson/threading_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from multiprocessing.pool import ThreadPool


def apply_threading(l, function, cant_threads):
if cant_threads == 1:
return [function(x) for x in l]
pool = ThreadPool(processes=cant_threads)
results = pool.map(function, l)
pool.close()
pool.join()
return results
84 changes: 62 additions & 22 deletions pydatajson/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import platform
from collections import Counter

import requests

from pydatajson.constants import VALID_STATUS_CODES
from pydatajson import threading_helper
from pydatajson import constants
from pydatajson.helpers import is_working_url

try:
Expand Down Expand Up @@ -223,18 +222,38 @@ def _validate_landing_pages(self, catalog):
datasets = catalog.get('dataset')
datasets = filter(lambda x: x.get('landingPage'), datasets)

metadata = []
urls = []

for dataset_idx, dataset in enumerate(datasets):
dataset_title = dataset.get('title')
landing_page = dataset.get('landingPage')
metadata.append({
"dataset_idx": dataset_idx,
"dataset_title": dataset.get('title'),
"landing_page": dataset.get('landingPage'),
})
urls.append(dataset.get('landingPage'))

sync_res = threading_helper\
.apply_threading(urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

for i in range(len(sync_res)):
valid, status_code = sync_res[i]
act_metadata = metadata[i]
dataset_idx = act_metadata["dataset_idx"]
dataset_title = act_metadata["dataset_title"]
landing_page = act_metadata["landing_page"]

valid, status_code = is_working_url(landing_page)
if not valid:
yield ce.BrokenLandingPageError(dataset_idx, dataset_title,
landing_page, status_code)

def _validate_distributions_urls(self, catalog):
datasets = catalog.get('dataset')

metadata = []
urls = []
for dataset_idx, dataset in enumerate(datasets):
distributions = dataset.get('distribution')

Expand All @@ -243,22 +262,43 @@ def _validate_distributions_urls(self, catalog):
access_url = distribution.get('accessURL')
download_url = distribution.get('downloadURL')

access_url_is_valid, access_url_status_code = \
is_working_url(access_url)
download_url_is_valid, download_url_status_code = \
is_working_url(download_url)
if not access_url_is_valid:
yield ce.BrokenAccessUrlError(dataset_idx,
distribution_idx,
distribution_title,
access_url,
access_url_status_code)
if not download_url_is_valid:
yield ce.BrokenDownloadUrlError(dataset_idx,
distribution_idx,
distribution_title,
download_url,
download_url_status_code)
metadata.append({
"dataset_idx": dataset_idx,
"dist_idx": distribution_idx,
"dist_title": distribution_title
})
urls += [access_url, download_url]

sync_res = threading_helper\
.apply_threading(urls,
is_working_url,
constants.CANT_THREADS_BROKEN_URL_VALIDATOR)

for i in range(len(metadata)):
actual_metadata = metadata[i]
dataset_idx = actual_metadata["dataset_idx"]
distribution_idx = actual_metadata["dist_idx"]
distribution_title = actual_metadata["dist_title"]

k = i*2
access_url = urls[k]
download_url = urls[k+1]

access_url_is_valid, access_url_status_code = sync_res[k]
download_url_is_valid, download_url_status_code = sync_res[k+1]

if not access_url_is_valid:
yield ce.BrokenAccessUrlError(dataset_idx,
distribution_idx,
distribution_title,
access_url,
access_url_status_code)
if not download_url_is_valid:
yield ce.BrokenDownloadUrlError(dataset_idx,
distribution_idx,
distribution_title,
download_url,
download_url_status_code)


def is_valid_catalog(catalog, validator=None):
Expand Down
3 changes: 3 additions & 0 deletions tests/test_catalog_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from pydatajson.catalog_readme import generate_readme
from tests.support.decorators import RESULTS_DIR

import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join("tests", "cassetes"),
record_mode='once')
Expand Down
18 changes: 12 additions & 6 deletions tests/test_readers_and_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import os.path
import unittest
from tempfile import NamedTemporaryFile

import nose
import vcr

from tempfile import NamedTemporaryFile

from pydatajson import constants
from tests.support.factories.xlsx import CSV_TABLE, WRITE_XLSX_TABLE
from tests.support.factories.xlsx import READ_XLSX_TABLE

Expand All @@ -27,10 +28,15 @@
from tests import xl_methods
import openpyxl as pyxl

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join(
"tests", "cassetes", "readers_and_writers"),
record_mode='once')
import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(
path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join(
"tests", "cassetes", "readers_and_writers"),
record_mode='once'
)


class ReadersAndWritersTestCase(unittest.TestCase):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from unittest import TestCase


from pydatajson.threading_helper import apply_threading


class ThreadingTests(TestCase):

def test_threading(self):
elements = [1, 2, 3, 4]

def function(x):
return x ** 2

result = apply_threading(elements, function, 3)

self.assertEqual(result, [1, 4, 9, 16])

def test_broken_function(self):
elements = [1, 2, 3, 0]

def divide(x):
return 6/x
with self.assertRaises(ZeroDivisionError): # Es "sincrónico"!
apply_threading(elements, divide, 3)
3 changes: 3 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from .context import pydatajson
from .support.decorators import RESULTS_DIR

import pydatajson.constants
pydatajson.constants.CANT_THREADS_BROKEN_URL_VALIDATOR = 1

my_vcr = vcr.VCR(path_transformer=vcr.VCR.ensure_suffix('.yaml'),
cassette_library_dir=os.path.join("tests", "cassetes"),
record_mode='once')
Expand Down