In [None]:
import datetime
import re
import sys
import itertools as it
import functools as ft
import dataclasses
from dataclasses import dataclass
from pathlib import Path
from typing import List, Any, Optional
import scipy as sp
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
sns.set_context("talk")

In [None]:
def parse_date(s):
    return datetime.datetime.strptime(s, "%Y-%m-%d").date()

def parse_time(s):
    return datetime.datetime.strptime(s, "%H%M").time()

AMOUNT_AND_RATE_REGEX = re.compile(r"^(\d+)(?:\s*([gj])\s*)?(?:\s*@(\d+)\s*ml/hr\s*)?$")
def parse_tail_pump(amount_and_rate, kind=None, default_kind=None):
    m = AMOUNT_AND_RATE_REGEX.search(amount_and_rate)
    return dict(
        amount=int(m.group(1)),
        port=m.group(2) or "j",
        rate=int(m.group(3)) if m.group(3) else 58,
        kind=kind or default_kind,
    )

@dataclass
class Event:
    time: datetime.datetime
    duration: Optional[float]
    kind: str
    description: str
    detail: Any
    line_number: int

def parse_event(n, date, start, end, row, tail_parsers):
    head, *rest = [s.strip() for s in row.split(",")]
    kind, *description = head.split(" ")
    detail = None
    if rest:
        assert kind in tail_parsers, f"cannot parse kind: {kind} (rest: {rest})"
        detail = tail_parsers[kind](*rest)
    start_date = datetime.datetime.combine(date, start)
    duration = None
    if end:
        duration = (datetime.datetime.combine(date, end) - start_date).total_seconds() % (24 * 60 * 60)
        assert duration >= 0, "negative duration"
    return Event(start_date, duration, kind, " ".join(s.strip() for s in description), detail,
                 line_number=n)

def parse(lines):
    date_regex = re.compile(r"^#\s*(\d{4}-\d{2}-\d{2})$")
    time_regex = re.compile(r"^(\d{4})\s*,(.+)$")
    range_regex = re.compile(r"^(\d{4})-(\d{4})\s*,(.+)$")
    curl_regex = re.compile(r"^curl\s*,\s*(\d+)\s*(?:,\s*(.*))?$")
    water_regex = re.compile(r"^water\s+([gj])\s*,\s*(\d+)\b.*$")
    other_regex = re.compile(r"^(country|note)\s*,\s*(.+)$")
    tail_parsers = dict(
        feed=ft.partial(parse_tail_pump, default_kind="milk"),
        water=ft.partial(parse_tail_pump, default_kind="water"),
        dioralyte=ft.partial(parse_tail_pump, default_kind="dioralyte"),
        temperature=lambda s: dict(value=float(s)),
    )

    date = None
    def parse_line(n, line):
        nonlocal date
        if m := date_regex.search(line):
            date = parse_date(m.group(1))
            return None
        assert date, f"first line should contain a date"

        if m := time_regex.search(line):
            time = parse_time(m.group(1))
            return parse_event(n, date, time, None, m.group(2), tail_parsers)
        if m := range_regex.search(line):
            start, end = parse_time(m.group(1)), parse_time(m.group(2))
            return parse_event(n, date, start, end, m.group(3), tail_parsers)
        if m := curl_regex.search(line):
            count = int(m.group(1))
            times = [parse_time(s.strip()) for s in m.group(2).split(",") if s.strip()] if m.group(2) else []
            return Event(datetime.datetime.combine(date, datetime.time(0)),
                         duration=None, kind="curl", description="",
                         detail=dict(count=count, times=times),
                         line_number=n)
        if m := water_regex.search(line):
            return Event(datetime.datetime.combine(date, datetime.time(0)),
                         duration=None, kind="water", description="",
                         detail=dict(port=m.group(1), amount=int(m.group(2))),
                         line_number=n)
        if m := other_regex.search(line):
            return Event(datetime.datetime.combine(date, datetime.time(0)),
                         duration=None, kind=m.group(1), description=m.group(2),
                         detail={}, line_number=n)
        assert False, f"line not matched - didn't match any patterns"

    for n, line in lines:
        if line := line.lower().strip():
            try:
                if event := parse_line(n, line):
                    yield event
            except Exception as e:
                print(f"Failed to parse line [{n}] {line!r} -- {e}", file=sys.stderr)

KIND_MAP = dict(
    asleep="sleep",
    nap="sleep",
    awake="wake",
    **{kind: kind for kind in [
        "cannula",
        "clonidine",
        "coamoxiclav",
        "country",
        "curl",
        "down",
        "dioralyte",
        "feed",
        "glycerin",
        "fluxy",
        "ibuprofen",
        "liquefaction",
        "note",
        "paracetamol",
        "poops",
        "replaced",
        "sicks",
        "sleep",
        "temperature",
        "tight",
        "wake",
        "water",
        "weight",
    ]}
)
def normalize(event):
    try:
        return Event(time=event.time, duration=event.duration,
                     kind=KIND_MAP[event.kind], description=event.description,
                     detail=event.detail, line_number=event.line_number)
    except Exception as e:
        raise Exception(f"Bad event {event}", e)
        
lines = list(zip(it.count(1), Path("data/2021-11-20.txt").read_text().split("\n")))
lines.reverse()
events = [normalize(event) for event in parse(lines)]

In [None]:
@dataclass
class Sleep:
    date: datetime.date
    sleep: float
    nap: float
    awake: float
    
def count_sleep():
    sleeps = []
    event_iter = iter(events)
    for event in event_iter:
        if event.kind == "wake" and event.duration is None:
            previous = event
            sleeps.append(Sleep(event.time.date(), 0, 0, 0))
            break

    for event in event_iter:
        if event.kind in {"wake", "sleep"}:
            end = datetime.datetime.combine(
                sleeps[-1].date + datetime.timedelta(days=1), datetime.time(hour=11))
            if event.time > end:
                sleeps.append(Sleep(event.time.date(), 0, 0, 0))
            if event.duration is None:
                if previous.kind == "sleep":
                    assert event.kind == "wake", f"{previous} -> {event}"
                    sleeps[-1].sleep += (event.time - previous.time).total_seconds()
                    # print(sleeps[-1].date, previous.time, event.time, (event.time - previous.time).total_seconds())
                previous = event
            elif event.kind == "wake":
                sleeps[-1].awake += event.duration
            else:
                sleeps[-1].nap += event.duration
    return sleeps

df = pd.DataFrame.from_dict([dataclasses.asdict(s) for s in count_sleep()[:-1]])
df["net_sleep"] = np.maximum(df.sleep + df.nap - df.awake, 0)
plt.figure(figsize=(14, 8))
for c, k in zip(sns.color_palette(), ["net_sleep", "nap", "awake"]):
    plt.plot(df.date, sp.ndimage.gaussian_filter1d(df[k] / (60*60), sigma=5), label=k, color=c)
    plt.plot(df.date, df[k] / (60*60), label="_nolabel", color=c, alpha=.2)
plt.legend()
plt.xticks(rotation=20)
plt.ylim((0, 14))
plt.ylabel("Hours")
plt.title("Sleep over time");

In [None]:
dates = []
for line in Path("data/dates.csv").read_text().rstrip("\n").split("\n"):
    date, description = line.split(",")
    dates.append([datetime.date.fromisoformat(date), description])
    
df = pd.DataFrame.from_dict([dict(date=e.time, count=e.detail["count"])
                             for e in events if e.kind == "curl"])
for d0, d1 in zip(df.date, df.date[1:]):
    if d1 < d0:
        print(f"Dates out of order: {d0} then {d1}")
np.testing.assert_array_equal(df.date.sort_values().array, df.date.array)
df["smooth_count"] = sp.ndimage.gaussian_filter1d(df["count"].map(float), sigma=3)

plt.figure(figsize=(14, 8))
df.plot(x='date', y='smooth_count', style='k', legend=False, ax=plt.gca())
df.plot(x='date', y='count', style='k', alpha=.2, legend=False, ax=plt.gca())
plt.ylim((0, 20))
for date, description in dates:
    plt.vlines(date, *plt.ylim(), linestyles="dashed", color="k")
    plt.annotate(description, [date, plt.ylim()[1]], fontsize=12, xytext=(5, -20), textcoords="offset pixels")
plt.xlabel("Date")
plt.ylabel('Curl /day')
plt.title('Curl events over time');

In [None]:
hours = [time.hour for e in events if e.kind == "curl" for time in e.detail["times"]]
plt.figure(figsize=(14, 8))
bins = np.arange(0, 24, 1)
plt.bar(bins, np.bincount(hours) / sum(1 for e in events if e.kind == "curl"), align="edge", width=.9)
plt.xticks(bins)
plt.xlim((0, 24))
plt.gca().xaxis.set_major_formatter(matplotlib.ticker.StrMethodFormatter("{x:02d}"))
plt.ylabel('Curl /day')
plt.title('Curls by hour-of-day')
plt.xlabel("Hour");

In [None]:
df = pd.DataFrame.from_dict([dict(time=e.time) for e in events if e.kind == "sicks"])
df["date"] = pd.to_datetime(df["time"].apply(lambda d: d.date().isoformat()))
df["hour"] = df.time.apply(lambda t: t.hour)
df["count"] = 1

f = df.groupby('date')['count'].sum()
f = f.reindex(pd.date_range(f.index.min(), f.index.max()), fill_value=0).reset_index().rename(columns=dict(index='date'))
f["smooth_count"] = sp.ndimage.gaussian_filter1d(f["count"].map(float), sigma=3)

plt.figure(figsize=(14, 8))
f.plot(x="date", y="smooth_count", color="k", legend=False, ax=plt.gca())
f.plot(x='date', y='count', style='k', alpha=.2, legend=False, ax=plt.gca())
plt.xlabel("Date")
plt.ylabel('Vomit /day')
plt.title('Vomits over time')
plt.ylim(f['count'].min(), f['count'].max());

In [None]:
hours = list(df.hour)
plt.figure(figsize=(14, 8))
bins = np.arange(0, 24, 1)
plt.bar(bins, np.bincount(df.hour) / len(df.date.unique()), align="edge", width=.9)
plt.xticks(bins)
plt.xlim((0, 24))
plt.gca().xaxis.set_major_formatter(matplotlib.ticker.StrMethodFormatter("{x:02d}"))
plt.ylabel('Vomit /day')
plt.title('Vomit by hour-of-day')
plt.xlabel("Hour");