Skip to content

Commit

Permalink
Rework how publication date stuff is handled
Browse files Browse the repository at this point in the history
Retractions are tricky. The previous approach did not consider that they
may easily come from the distant past. And we don't know exactly from
when:

1. Either a case has been introduced before the "historic data
   available" cutoff (i.e. from before when we have daily full case
   files): Then it was recorded at the Meldedatum in the record.

2. Or a case has been introdued after the "historic data available"
   cutoff. In that case, it has been recorded in the dataset at the
   exact date at which the dataset was published.

Unfortunately, to resolve the second case, we lack sufficient data: We
do not know the publication date of any recorded record. We have to
guess and start working our way forward starting from the reported date
until we find a timeslot where at least as many cases have been added as
are being retracted.

This is obviously not without potential flaws. For instance, if a case
group is reported with 4 new cases on day X and 3 cases on day X+1 and
later on, a retraction aimed at the case group on day X+1 comes in and
retracts all three cases. Then we'll remove the cases on day X, because
it is the first bin with enough matching cases available. If another
retraction comes in and attempts to remove the case group from day X, it
will not find a matching bin: the one at day X only has 1 case left, and
the one on day X+1 only had 3 cases to begin with.

In such cases, we'll now log a warning; originally, I wanted to make
this panicking, but it appears that at least one dataset has the issue of
retracting a case *which had never been reported* [1]! Hence, we cannot
be strict about this and need to hope that we'll not run into such a
situation too often.

(We can still detect it at a later point, because we'll see too many
cases in {cases,deaths,recovered}_pub_cum compared to the respective ref
series.)

   [1]: robert-koch-institut/SARS-CoV-2-Infektionen_in_Deutschland_Archiv#11
  • Loading branch information
horazont committed Nov 22, 2021
1 parent 2367c49 commit a0c8a9c
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 279 deletions.
203 changes: 165 additions & 38 deletions src/bin/rki_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use std::fs::File;

use chrono::NaiveDate;

use covid::{DistrictId, MaybeAgeGroup, Sex, Counters, ReportFlag, InfectionRecord, global_start_date, naive_today, StepMeter, CountMeter, ProgressSink, DiffRecord, Diff};
use covid::{DistrictId, MaybeAgeGroup, Sex, Counters, ReportFlag, InfectionRecord, global_start_date, naive_today, StepMeter, CountMeter, ProgressSink, DiffRecord};


type PartialCaseKey = (DistrictId, MaybeAgeGroup, Sex);

const DELAY_CUTOFF: i64 = 28;

struct PartialDiffData {
pub cases_by_pub: Diff<PartialCaseKey>,
pub cases_by_pub: Counters<PartialCaseKey>,
pub cases_delayed: Counters<PartialCaseKey>,
pub case_delay_total: Counters<PartialCaseKey>,
pub late_cases: Counters<PartialCaseKey>,
pub deaths_by_pub: Diff<PartialCaseKey>,
pub recovered_by_pub: Diff<PartialCaseKey>,
pub deaths_by_pub: Counters<PartialCaseKey>,
pub recovered_by_pub: Counters<PartialCaseKey>,
}

fn saturating_add_u64_i32(reg: &mut u64, v: i32) {
Expand All @@ -29,44 +29,154 @@ fn saturating_add_u64_i32(reg: &mut u64, v: i32) {
}
}

fn checked_add_u64_i64(a: u64, b: i64) -> Option<u64> {
if b < 0 {
let b = (-b) as u64;
a.checked_sub(b)
} else {
let b = b as u64;
a.checked_add(b)
}
}

impl PartialDiffData {
fn new(start: NaiveDate, end: NaiveDate) -> Self {
Self{
cases_by_pub: Diff::new(start, end),
cases_by_pub: Counters::new(start, end),
cases_delayed: Counters::new(start, end),
case_delay_total: Counters::new(start, end),
late_cases: Counters::new(start, end),
deaths_by_pub: Diff::new(start, end),
recovered_by_pub: Diff::new(start, end),
deaths_by_pub: Counters::new(start, end),
recovered_by_pub: Counters::new(start, end),
}
}

fn submit(&mut self, date: NaiveDate, rec: &InfectionRecord)
{
let index = self.cases_by_pub.date_index(date).expect("date out of range");
fn submit_initial(&mut self, rec: &InfectionRecord) {
// For the priming of the database, we fill the d1 column with the data based on the report date (which is generally closer to the publication date than the reference date).
// We need the full d1 columns, even for the priming data, to be able to process later retractions correctly.
let index = self.cases_by_pub.date_index(rec.report_date).expect("date out of range");
let k = (rec.district_id, rec.age_group, rec.sex);

let (case_index, case_diff) = match rec.case {
ReportFlag::NewlyReported => (index, rec.case_count),
// If we see retractions in this data sample, we count them positively, because we will **also** call .submit() for all entries in the first sample to process them to gain extra data.
// That means that the retracted cases will be subtracted from their report dates (if possible), hence, we insert them here positively.
let case_diff = match rec.case {
ReportFlag::Consistent => rec.case_count,
// Note: the data is negative in the source already.
ReportFlag::Retracted => (index - 1, rec.case_count),
_ => (0, 0),
ReportFlag::Retracted => -rec.case_count,
_ => 0,
};
let (death_index, death_diff) = match rec.death {
ReportFlag::NewlyReported => (index, rec.death_count),
assert!(case_diff >= 0);

// For recovered/death, the report date is grossly incorrect, but we cannot do anything about that in the historic dataset. At some point, we might want to change the heuristic for those.
let death_diff = match rec.death {
ReportFlag::Consistent => rec.death_count,
// Note: the data is negative in the source already.
ReportFlag::Retracted => (index - 1, rec.death_count),
_ => (0, 0),
ReportFlag::Retracted => -rec.death_count,
_ => 0,
};
assert!(death_diff >= 0);

let recovered_diff = match rec.recovered {
ReportFlag::Consistent => rec.recovered_count,
// Note: the data is negative in the source already.
ReportFlag::Retracted => -rec.recovered_count,
_ => 0,
};
let (recovered_index, recovered_diff) = match rec.recovered {
ReportFlag::NewlyReported => (index, rec.recovered_count),
assert!(recovered_diff >= 0);

self.cases_by_pub.get_or_create(k)[index] += case_diff as u64;
self.deaths_by_pub.get_or_create(k)[index] += death_diff as u64;
self.recovered_by_pub.get_or_create(k)[index] += recovered_diff as u64;
}

fn prepare_diff(
k: &PartialCaseKey,
target_index: usize,
target_ts: &Counters<PartialCaseKey>,
report_date: NaiveDate,
flag: ReportFlag,
count: i32,
) -> (usize, i64) {
// Find a location to place a diff based on a given case count + report flag.
// This will try to find the best possible location for a retraction, but drop retractions (with a warning message) if no such location can be found.

// TODO: logging
let count = count as i64;
match flag {
ReportFlag::NewlyReported => (target_index, count),
// Note: the data is negative in the source already.
ReportFlag::Retracted => (index - 1, rec.recovered_count),
ReportFlag::Retracted => {
let start_at = target_ts.date_index(report_date).expect("date out of range");
assert!(count <= 0);
let retract_index = match target_ts.find_ge(k, start_at, (-count) as u64) {
Some(i) => i,
None => {
// TODO: use logging
eprintln!("warn: retraction found, but no matching case in dataset: k={:?}, count={}, report_date={}", k, count, report_date);
return (0, 0)
},
};
(retract_index, count)
},
_ => (0, 0),
};
}
}

if case_diff == 0 && death_diff == 0 && recovered_diff == 0 {
fn apply_diff(
k: &PartialCaseKey,
target_index: usize,
target_ts: &mut Counters<PartialCaseKey>,
report_date: NaiveDate,
flag: ReportFlag,
count: i32,
) {
let (index, diff) = Self::prepare_diff(
k,
target_index,
target_ts,
report_date,
flag,
count,
);
if diff == 0 {
return
}
let ts = target_ts.get_or_create(*k);
ts[index] = match checked_add_u64_i64(ts[index], diff) {
Some(v) => v,
None => panic!("attempt to decrease diff below zero!"),
}
}

fn submit(&mut self, date: NaiveDate, rec: &InfectionRecord)
{
let index = self.cases_by_pub.date_index(date).expect("date out of range");
let k = (rec.district_id, rec.age_group, rec.sex);

Self::apply_diff(
&k,
index,
&mut self.cases_by_pub,
rec.report_date,
rec.case,
rec.case_count,
);
Self::apply_diff(
&k,
index,
&mut self.deaths_by_pub,
rec.report_date,
rec.death,
rec.death_count,
);
Self::apply_diff(
&k,
index,
&mut self.recovered_by_pub,
rec.report_date,
rec.recovered,
rec.recovered_count,
);

let (case_delay, case_delay_count, late_case_count) = match rec.case {
ReportFlag::NewlyReported => {
Expand All @@ -82,22 +192,17 @@ impl PartialDiffData {
_ => (0, 0, 0),
};

let k = (rec.district_id, rec.age_group, rec.sex);
self.cases_by_pub.get_or_create(k)[case_index] += case_diff as i64;
saturating_add_u64_i32(&mut self.cases_delayed.get_or_create(k)[case_index], case_delay_count);
saturating_add_u64_i32(&mut self.case_delay_total.get_or_create(k)[case_index], case_delay * case_delay_count);
saturating_add_u64_i32(&mut self.late_cases.get_or_create(k)[case_index], late_case_count);
self.deaths_by_pub.get_or_create(k)[death_index] += death_diff as i64;
self.recovered_by_pub.get_or_create(k)[recovered_index] += recovered_diff as i64;
saturating_add_u64_i32(&mut self.cases_delayed.get_or_create(k)[index], case_delay_count);
saturating_add_u64_i32(&mut self.case_delay_total.get_or_create(k)[index], case_delay * case_delay_count);
saturating_add_u64_i32(&mut self.late_cases.get_or_create(k)[index], late_case_count);
}

fn write_all<W: io::Write, S: ProgressSink + ?Sized>(&self, s: &mut S, w: &mut W) -> io::Result<()> {
let start = self.cases_by_pub.start();
let len = self.cases_by_pub.len();
let mut pm = StepMeter::new(s, len);
let mut w = csv::Writer::from_writer(w);
for i in 0..len {
let date = start + chrono::Duration::days(i as i64);
let date = self.cases_by_pub.index_date(i as i64).unwrap();
for k in self.cases_by_pub.keys() {
let cases = self.cases_by_pub.get_value(k, i).unwrap_or(0);
let cases_delayed = self.cases_delayed.get_value(k, i).unwrap_or(0);
Expand Down Expand Up @@ -131,7 +236,7 @@ impl PartialDiffData {
}
}

fn load_existing<R: io::Read, S: ProgressSink + ?Sized>(s: &mut S, r: &mut R, d: &mut PartialDiffData) -> io::Result<()> {
fn load_existing<R: io::Read, S: ProgressSink + ?Sized>(s: &mut S, r: &mut R, d: &mut PartialDiffData) -> io::Result<usize> {
let mut r = csv::Reader::from_reader(r);
let mut pm = CountMeter::new(s);
let mut n = 0;
Expand All @@ -151,18 +256,35 @@ fn load_existing<R: io::Read, S: ProgressSink + ?Sized>(s: &mut S, r: &mut R, d:
n = i+1;
}
pm.finish(n);
Ok(())
Ok(n)
}

fn try_load_existing<P: AsRef<Path>, S: ProgressSink + ?Sized>(s: &mut S, path: P, d: &mut PartialDiffData) -> io::Result<()> {
fn try_load_existing<P: AsRef<Path>, S: ProgressSink + ?Sized>(s: &mut S, path: P, d: &mut PartialDiffData) -> io::Result<bool> {
let r = match File::open(path) {
Ok(f) => f,
// ignore missing files here
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(false),
Err(other) => return Err(other),
};
let mut r = flate2::read::GzDecoder::new(r);
load_existing(s, &mut r, d)
Ok(load_existing(s, &mut r, d)? > 0)
}

fn prime<P: AsRef<Path>, S: ProgressSink + ?Sized>(s: &mut S, path: P, d: &mut PartialDiffData) -> io::Result<()> {
let r = covid::magic_open(path)?;
let mut r = csv::Reader::from_reader(r);
let mut pm = CountMeter::new(s);
let mut n = 0;
for (i, row) in r.deserialize().enumerate() {
let rec: InfectionRecord = row?;
d.submit_initial(&rec);
if i % 500000 == 499999 {
pm.update(i+1);
}
n = i+1;
}
pm.finish(n);
Ok(())
}

fn merge_new<P: AsRef<Path>, S: ProgressSink + ?Sized>(s: &mut S, path: P, date: NaiveDate, d: &mut PartialDiffData) -> io::Result<()> {
Expand Down Expand Up @@ -199,10 +321,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut counters = PartialDiffData::new(start, end);

println!("loading existing records ...");
try_load_existing(&mut *covid::default_output(), datafile, &mut counters)?;
let mut found_anything = try_load_existing(&mut *covid::default_output(), datafile, &mut counters)?;

for pair in argv[2..].chunks(2) {
let newfile = &pair[0];
if !found_anything {
println!("priming dataset using {}...", newfile);
prime(&mut *covid::default_output(), newfile, &mut counters)?;
found_anything = true;
}
// subtract one because the publication refers to the day before
let date = pair[1].parse::<NaiveDate>()? - chrono::Duration::days(1);
println!("merging new records ({} -> {}) ...", newfile, date);
Expand Down
Loading

0 comments on commit a0c8a9c

Please sign in to comment.