Skip to content
This repository has been archived by the owner on May 26, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into feature/pluggable-timeliness
Browse files Browse the repository at this point in the history
  • Loading branch information
georgiana-b committed Aug 8, 2016
2 parents b7c3d4a + 289a8e0 commit 33d2853
Show file tree
Hide file tree
Showing 18 changed files with 392 additions and 2,225 deletions.
10 changes: 6 additions & 4 deletions data_quality/datapackage.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@
{
"name": "created_at",
"title": "time of the source's creation.",
"type": "string",
"type": "date",
"format": "date",
"constraints": { "required": true }
},
{
Expand Down Expand Up @@ -217,9 +218,10 @@
"constraints": { "required": true, "unique": true }
},
{
"name": "created_at",
"title": "Time of the source's creation.",
"type": "string",
"name": "month_of_creation",
"title": "Month when the source was created",
"type": "date",
"format": "date",
"constraints": { "required": true }
},
{
Expand Down
3 changes: 3 additions & 0 deletions data_quality/dq.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"branch": "master",
"assess_timeliness": false,
"timeliness":{},
"data_quality_spec": {
"data_quality_spec_web": "https://raw.githubusercontent.com/frictionlessdata/data-quality-spec/master/spec.json"
},
"goodtables": {
"goodtables_web": "http://goodtables.okfnlabs.org",
"arguments": {
Expand Down
185 changes: 145 additions & 40 deletions data_quality/tasks/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import csv
import uuid
import pytz
import datetime
import jsontableschema
from math import log
from datetime import datetime, timedelta
from data_quality import utilities, compat, exceptions
from .base_task import Task
from .check_datapackage import DataPackageChecker
Expand All @@ -36,10 +37,12 @@ def __init__(self, config, **kwargs):
self.initialize_file(self.result_file, self.result_schema.headers)
self.initialize_file(self.run_file, self.run_schema.headers)
self.run_id = compat.str(uuid.uuid4().hex)
self.timestamp = datetime.datetime.now(pytz.utc)
self.max_score = 10
self.timestamp = datetime.now(pytz.utc)
self.all_scores = []
self.assess_timeliness = self.config['assess_timeliness']
self.timeliness_period = self.config['timeliness'].get('timeliness_period', 1)
self.max_score = 100
self.scoring_algorithm = self.config.get('scoring_algorithm', 1)
required_resources = [self.result_file, self.source_file,
self.publisher_file, self.run_file]
datapackage_check.check_database_completeness(required_resources)
Expand All @@ -51,16 +54,19 @@ def run(self, pipeline):
with compat.UnicodeAppender(self.result_file, quoting=csv.QUOTE_MINIMAL) as result_file:
source = self.get_source(pipeline.data_source)
result_id = compat.str(uuid.uuid4().hex)
created_at = source['created_at']
score = self.get_pipeline_score(pipeline)
source['created_at'] = utilities.date_from_string(source['created_at'])
if source['created_at'] is None:
raise ValueError(('No date could be extracted from `created_at`'
' field in source: {0}.').format(source))
score = self.get_pipeline_score(pipeline, source)
data_source = pipeline.data_source
schema = ''
summary = '' # TODO: how/what should a summary be?
report = self.get_pipeline_report_url(pipeline)

result = [result_id, source['id'], source['publisher_id'],
created_at, data_source, schema, score, summary,
self.run_id, self.timestamp, report]
source['created_at'], data_source, schema, score,
summary, self.run_id, self.timestamp, report]
try:
result_file.writerow(list(self.result_schema.convert_row(*result)))
except jsontableschema.exceptions.MultipleInvalid as e:
Expand Down Expand Up @@ -94,6 +100,31 @@ def initialize_file(self, filepath, headers):
with compat.UnicodeWriter(filepath, quoting=csv.QUOTE_MINIMAL) as a_file:
a_file.writerow(headers)

def write_run(self):
"""Write this run in the run file."""

with compat.UnicodeAppender(self.run_file, quoting=csv.QUOTE_MINIMAL) as run_file:
entry = [self.run_id, self.timestamp, int(round(sum(self.all_scores) / len(self.lookup)))]
try:
run_file.writerow(list(self.run_schema.convert_row(*entry)))
except jsontableschema.exceptions.MultipleInvalid as e:
for error in e.errors:
raise error

return True

def fetch_data(self, data_stream, encoding, source):
"""Cache the data source in the /fetched directory"""

source_name = source.get('name', source[self.data_key].rsplit('/', 1)[-1])
source_name = source_name or source['id']
cached_file_name = os.path.join(self.cache_dir, source_name)
data_stream.seek(0)

with io.open(cached_file_name, mode='w+', encoding=encoding) as fetched_file:
for line in data_stream:
fetched_file.write(line)

def get_source(self, data_src):
"""Find the entry correspoding to data_src from sources file"""

Expand All @@ -111,50 +142,124 @@ def get_source(self, data_src):

return matches[0]

def get_pipeline_score(self, pipeline):
def get_pipeline_report_url(self, pipeline):
"""Return a URL to a report on this data."""

return self.config['goodtables']['goodtables_web']

def get_pipeline_score(self, pipeline, source):
"""Return a score for this pipeline run."""

score = self.max_score
results = pipeline.report.generate()['results']

if not results:
pass
report = pipeline.report.generate()
error_stats = self.get_error_stats(report)
total_no_rows = report['meta']['row_count']
base_errors = {err: stats for err, stats in error_stats.items()
if stats['processor'] == 'base'}
if base_errors:
score = 0
else:
result_count = len(results)

if result_count > score:
score = 0
if self.scoring_algorithm == 1:
score = self.score_by_error_occurences(error_stats)
elif self.scoring_algorithm == 2:
score = self.score_by_affected_rows(error_stats, total_no_rows+1)
else:
score = score - result_count

raise ValueError(('The only options for "scoring_algorithm" are'
' 1 and 2.'))

if self.assess_timeliness:
publication_delay = self.get_publication_delay(source)
score -= publication_delay
score = round(score)
if score < 0:
score = 0
self.all_scores.append(score)
return score

def get_pipeline_report_url(self, pipeline):
"""Return a URL to a report on this data."""
def get_publication_delay(self, source):
"""Determine how long the data source publication was delayed"""

dates = {}
relevance_period = source['period_id'].split('/')
relevance_period = relevance_period + [None]*(2 - len(relevance_period))
dates['period_start'], dates['period_end'] = relevance_period
dates = {k: utilities.date_from_string(v) for k, v in dates.items()}
dates['period_end'] = dates['period_end'] or dates['period_start']
timely_until = dates['period_end'] + \
timedelta(days=(self.timeliness_period * 30))
if dates['period_start'] <= source['created_at'] <= timely_until:
delay = 0
else:
delay = source['created_at'] - timely_until
delay = delay.days
if delay < 0:
delay = 0
delay = delay / 30.00
return delay

def get_error_stats(self, report):
"""Return dict with stats on errors"""

results = report['results']
dq_spec = utilities.get_data_quality_spec()
error_stats = {}
for result in results:
if result['result_level'] == 'error':
error = error_stats.get(result['result_id'], None)
if not error:
if result['processor'] == 'base':
error_spec = {}
else:
error_number = result['result_id'].split('_')[-1]
error_number = str(int(error_number) - 1)
error_spec = dq_spec[result['processor']][error_number]
new_stats = {'occurrences': 1, 'rows': [result['row_index']],
'weight': error_spec.get('weight', 1),
'processor': result['processor']}
error_stats[result['result_id']] = new_stats
else:
error['occurrences'] += 1
error['rows'].append(result['row_index'])
return error_stats

def score_by_affected_rows(self, error_stats, total_no_rows):
"""Score data source based on percent of rows affected by each error.
Algorithm:`total score - (error weight * percent_of_affected_rows)`
Args:
error_stats: dict with stats on each error
total_no_rows: number of rows the data source has
"""

return self.config['goodtables']['goodtables_web']
score = self.max_score
for error, stats in error_stats.items():
affected_rows = len(set(stats['rows']))
affected_rows_percent = (affected_rows * 100) / total_no_rows
error_impact = stats['weight'] * affected_rows_percent
score -= error_impact
return score

def write_run(self):
"""Write this run in the run file."""
def score_by_error_occurences(self, error_stats):
"""Score data source based on based on number of occurrences of each error
Algorithm: `total score - (error_weight * no_occurrences) /
(Σ 1/no_occurrences )`
with compat.UnicodeAppender(self.run_file, quoting=csv.QUOTE_MINIMAL) as run_file:
entry = [self.run_id, self.timestamp, int(round(sum(self.all_scores) / len(self.lookup)))]
try:
run_file.writerow(list(self.run_schema.convert_row(*entry)))
except jsontableschema.exceptions.MultipleInvalid as e:
for error in e.errors:
raise error

return True
Args:
error_stats: dict with stats on each error
"""

def fetch_data(self, data_stream, encoding, source):
"""Cache the data source in the /fetched directory"""
score = self.max_score
for error, stats in error_stats.items():
no_occurrences = stats['occurrences']
harmonic_mean_occ = no_occurrences / harmonic_number(no_occurrences)
error_impact = stats['weight'] * harmonic_mean_occ
score -= error_impact
return score

source_name = source.get('name', source[self.data_key].rsplit('/', 1)[-1])
cached_file_name = os.path.join(self.cache_dir, source_name)
data_stream.seek(0)
def harmonic_number(n):
"""Return an approximate value of n-th harmonic number, based on the
Euler-Mascheroni constant by the formula: H(n)≈ln(n)+γ+1/2*n−1/12*n^2
"""

with io.open(cached_file_name, mode='w+', encoding=encoding) as fetched_file:
for line in data_stream:
fetched_file.write(line)
gamma = 0.57721566490153286
return gamma + log(n) + 0.5/n - 1./(12*n**2)
25 changes: 5 additions & 20 deletions data_quality/tasks/assess_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_sources(self, publisher_id):
source = {}
if row['publisher_id'] == publisher_id:
source['id'] = row['id']
source['created_at'] = self.get_period(row['created_at'])
source['created_at'] = utilities.date_from_string(row['created_at'])
source['score'] = self.get_source_score(source['id'])
sources.append(source)
return sources
Expand All @@ -104,25 +104,9 @@ def get_source_score(self, source_id):
timestamp = dateutil.parser.parse(row['timestamp'])
if timestamp > latest_timestamp:
latest_timestamp = timestamp
score = int(row['score']) * 10
score = int(row['score'])
return score

def get_period(self, period):
"""Return a valid period as date object
Args:
period: a string that might contain a date or range of dates
"""

if not period:
return ''
try:
date_object = dateutil.parser.parse(period).date()
return date_object
except ValueError:
return ''

def get_periods_data(self, publisher_id, periods, sources):
"""Return list of performances for a publisher, by period.
Expand All @@ -141,7 +125,7 @@ def get_periods_data(self, publisher_id, periods, sources):
period_sources_to_date += period_sources
performance = {}
performance['publisher_id'] = publisher_id
performance['created_at'] = compat.str(period)
performance['month_of_creation'] = compat.str(period)
performance['files_count'] = len(period_sources)
performance['score'] = self.get_period_score(period_sources)
performance['valid'] = self.get_period_valid(period_sources)
Expand All @@ -163,7 +147,7 @@ def get_period_sources(self, period, sources):
period_sources = []

for source in sources:
if period == source['created_at']:
if period == source['created_at'].replace(day=1):
period_sources.append(source)
return period_sources

Expand Down Expand Up @@ -223,6 +207,7 @@ def get_all_periods(self, periods):
"""

oldest_date = sorted(periods)[0]
oldest_date = oldest_date.replace(day=1)
current_date = datetime.date.today()
delta = dateutil.relativedelta.relativedelta(months=1)
relative_date = oldest_date
Expand Down
15 changes: 8 additions & 7 deletions data_quality/tasks/extract_relevance_period.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ def run(self):

for source in sources:
if source['period_id'] is None:
self.timeliness_strategy = ['created_at']
period_start, period_end = self.identify_period(source)
creation_date = utilities.date_from_string(source['created_at'])
dates = [creation_date, creation_date]
else:
period_start, period_end = source['period_id']
periods = [period_start.date(), period_end.date()]
periods = [period.strftime('%d-%m-%Y') for period in periods]
source['period_id'] = '/'.join(periods)
dates = [period_start.date(), period_end.date()]
dates = [date.strftime('%d-%m-%Y') if isinstance(date, datetime.date)
else '' for date in dates]
source['period_id'] = '/'.join(dates)
self.update_sources_period(sources)

def extract_period_from_sources(self):
Expand Down Expand Up @@ -228,8 +229,8 @@ def create_date_from_parts(self, date_parts=None):
potential_date = ' '.join(date_parts[index:])
try:
date = self.date_parser.get_date_data(potential_date)
except (ValueError, TypeError):
continue
except ValueError:
date = None
if date['date_obj'] is not None:
break
else:
Expand Down

0 comments on commit 33d2853

Please sign in to comment.