# Real historical backfill aka agencies.yml v1 to post-GTFSDownloadConfig v2

[The actual PR](https://github.com/cal-itp/data-infra/pull/2033)

I created a new bucket (`gs://gtfs-schedule-backfill-test`) and used [data transfer](https://console.cloud.google.com/transfer/jobs/transferJobs%2F5240403197777129047/runs?project=cal-itp-data-infra) to copy just schedule production data (`gs://gtfs-data/schedule`) into the bucket. I then identified a date range to process by looking for our first `status.csv` and the oldest data in `"gs://calitp-gtfs-schedule-raw-v2"` which gives us a range of 2021-04-15 to 2022-09-14 inclusive.

The outcome and result classes from https://github.com/cal-itp/data-infra/blob/8dadfb31c9546af1f2ed20d19fd1d2d06c66282d/airflow/dags/download_gtfs_schedule_v2/download_schedule_feeds.py

In [None]:
# may be necessary depending on version of the jupyter singleuser image you are using
# we need the config type with the computed field
!pip install calitp==2022.12.8

In [None]:
import os

import pendulum
from google.cloud import storage
from tqdm.auto import tqdm

os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"] = "gs://test-calitp-gtfs-schedule-raw-v2"
SOURCE_BUCKET_PATH = "gs://gtfs-schedule-backfill-test/schedule/"
PARTITIONED_ARTIFACT_METADATA_KEY = "PARTITIONED_ARTIFACT_METADATA"
first_date = pendulum.parse("2021-04-15", exact=True)
first_date_v2 = pendulum.parse("2022-09-14", exact=True)

# import this after environ change
from calitp_data.storage import get_fs

fs = get_fs()
client = storage.Client(project="cal-itp-data-infra")
fs, client

In [None]:
folders = []
for d in fs.ls(SOURCE_BUCKET_PATH):
    if "T00:00:00" not in d:
        # skip some old ones that don't have midnight execution times, we probably shouldn't trust them?
        continue
    ts = pendulum.parse(d.split("/")[-1])
    if first_date <= ts.date() and ts.date() <= first_date_v2:
        folders.append(d)
len(folders), folders[0], folders[-1]

In [None]:
import base64
from typing import ClassVar, List, Optional

import pendulum
from calitp_data_infra.storage import (
    GTFSDownloadConfig,
    GTFSFeedType,
    GTFSScheduleFeedExtract,
    PartitionedGCSArtifact,
    ProcessingOutcome,
)
from google.cloud import storage
from pydantic import HttpUrl, parse_obj_as, validator


class GTFSDownloadOutcome(ProcessingOutcome):
    config: GTFSDownloadConfig
    extract: Optional[GTFSScheduleFeedExtract]
    backfilled: bool = False


class DownloadFeedsResult(PartitionedGCSArtifact):
    bucket: ClassVar[str] = os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"]
    table: ClassVar[str] = "download_schedule_feed_results"
    partition_names: ClassVar[List[str]] = ["dt", "ts"]
    ts: pendulum.DateTime
    end: pendulum.DateTime
    outcomes: List[GTFSDownloadOutcome] = []
    backfilled: bool = False

    # @validator("backfilled", allow_reuse=True)
    # def everything_backfilled(cls, v, values):
    #    outcomes_backfilled = set(outcome.backfilled for outcome in values["outcomes"])
    #    assert {v} == outcomes_backfilled
    #    return v

    @property
    def dt(self) -> pendulum.Date:
        return self.ts.date()

    def save(self, fs):
        self.save_content(
            fs=fs,
            content="\n".join(o.json() for o in self.outcomes).encode(),
            exclude={"outcomes"},
        )
        return self

In [None]:
import csv
import io
import json
import re
from collections import Counter
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from pprint import pprint
from zipfile import ZipFile, ZipInfo
from typing import Dict
from pydantic import BaseModel, ValidationError, parse_obj_as

# jinja pattern for removing the auth query parameter if it exists
jinja_pattern = r"(?<=\?)(?:api_key|token)=[\w-]+&?"


class SkipUrl(Exception):
    pass


class ZipTask(BaseModel):
    outcome: GTFSDownloadOutcome
    gcs_dir: Optional[str]
    files: Optional[Dict[str, pendulum.DateTime]]


def feed_folder_to_zip_task(
    folder, feed, zip_one_feed_fs=None, pbar=None
) -> ZipTask:
    zip_one_feed_fs = get_fs() if not zip_one_feed_fs else zip_one_feed_fs
    feed_key = f"{feed['itp_id']}_{feed['url_number']}"
    url = re.sub(jinja_pattern, "", feed["gtfs_schedule_url"]).rstrip("?")

    assert (
        url
        and "token" not in url
        and "api_key" not in url
        and not url.endswith("?")
        and not url.endswith("&")
        and "?&" not in url
    )

    try:
        validated_url = parse_obj_as(HttpUrl, url)
    except ValidationError:
        if url.startswith("http://.232"):
            raise SkipUrl
        raise

    config = GTFSDownloadConfig(
        extracted_at=None,
        name=feed["agency_name"],
        url=validated_url,
        feed_type=GTFSFeedType.schedule,
        schedule_url_for_validation=None,
        auth_query_params={},
        auth_headers={},
        computed=True,
    )

    if feed["status"] != "success":
        return ZipTask(
            outcome=GTFSDownloadOutcome(
                success=False,
                exception=Exception(feed["status"]),
                config=config,
                extract=None,
                backfilled=True,
            ),
        )

    feed_folder = f"{folder}/{feed_key}"
    files_to_timestamps = {}

    for current_dir, sub_dirs, files in zip_one_feed_fs.walk(feed_folder):
        # skip our processed subdir
        # skip situations where we have a weird duplicate subdir
        if (
            current_dir.endswith("processed")
            or "__MACOSX" in current_dir
            or f"{feed_key}/{feed_key}" in current_dir
        ):
            continue

        for file in files:
            if file.endswith("validation.json"):
                continue

            filename = (
                file
                if current_dir.endswith(feed_key)
                else f"{os.path.basename(current_dir)}/{file}"
            )
            files_to_timestamps[filename] = (
                pendulum.parse(
                    zip_one_feed_fs.stat(f"gs://{current_dir}/{file}")["customTime"],
                    exact=True,
                )
                .in_tz("Etc/UTC")
                .replace(microsecond=0)
            )

    if not files_to_timestamps:
        print(feed_key, to_walk, list(zip_one_feed_fs.walk(f"{folder}/{feed_key}")))
        raise RuntimeError

    first_ts = min(files_to_timestamps.values())
    last_ts = max(files_to_timestamps.values())

    if (last_ts - first_ts).total_seconds() > 600:
        print("got weirdly long extract: ", (last_ts - first_ts), to_walk)

    extract = GTFSScheduleFeedExtract(
        ts=first_ts,
        config=config,
        response_code=200,  # this is somewhat assumed
        filename="reconstructed.zip",
        reconstructed=True,
    )

    assert "+00:00/base64_url" in extract.path

    return ZipTask(
        outcome=GTFSDownloadOutcome(
            success=True,
            exception=None,
            config=config,
            extract=extract,
            backfilled=True,
        ),
        gcs_dir=feed_folder,
        files=files_to_timestamps,
    )

In [None]:
# Quick test of a single folder; can verify that we get a correct output task
# We want to check that the file names and timestamps are appropriate, and the config has the correct URL etc.

# eastern-sierra-transit-authority,Eastern Sierra Transit Authority,99,http://data.trilliumtransit.com/gtfs/easternsierra-ca-us/easternsierra-ca-us.zip,0,success
# 232 has a subfolder
zt = feed_folder_to_zip_task(
    folder=f"{SOURCE_BUCKET_PATH}2021-05-21T00:00:00+00:00",
    feed={
        "itp_id": "324",
        "url_number": "0",
        "status": "success",
        "agency_name": "Eastern Sierra Transit Authority",
        "gtfs_schedule_url": "http://data.trilliumtransit.com/gtfs/easternsierra-ca-us/easternsierra-ca-us.zip",
    },
)
zt.dict()

In [None]:
from typing import Tuple
def get_zip_tasks_from_folder(
    folder, handle_one_folder_fs=None, pool=None, top_pbar=None, i=None,
) -> Tuple[DownloadFeedsResult, List[ZipTask]]:
    fs = handle_one_folder_fs if handle_one_folder_fs else get_fs()

    with fs.open(f"gs://{folder}/status.csv", "r") as f:
        rows = list(csv.DictReader(f))

    deduplicated = {feed["gtfs_schedule_url"]: feed for feed in rows}
    
    tasks = []
    skipped = 0

    pbar = tqdm(total=len(deduplicated), desc=f"{i} {folder}", leave=False)
    futures = {
        pool.submit(
            feed_folder_to_zip_task,
            folder=folder,
            feed=feed,
            pbar=pbar,
            zip_one_feed_fs=fs,
        ): feed
        for feed in deduplicated.values()
    }

    for future in as_completed(futures):
        feed = futures[future]
        pbar.update()
        try:
            tasks.append(future.result())
        except SkipUrl:
            skipped += 1
        except Exception:
            print(feed)
            raise

    assert len(deduplicated) == (len(tasks) + skipped)
    outcomes = [task.outcome for task in tasks]
    ts = min(outcome.extract.ts for outcome in outcomes if outcome.extract)
    end = max(outcome.extract.ts for outcome in outcomes if outcome.extract)
    
    for outcome in outcomes:
        if outcome.extract:
            outcome.extract.ts = ts

    if (end - ts).total_seconds() > 600:
        print("got weirdly long extract: ", (end - ts), folder)

    result = DownloadFeedsResult(
        ts=ts,
        end=ts,
        outcomes=outcomes,
        filename="results.jsonl",
        backfilled=True,
    )

    assert result.path.startswith(
        f'{os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"]}/download_schedule_feed_results'
    ) and result.path.endswith("+00:00/results.jsonl")
    assert all(
        task.outcome.extract.path.startswith(
            f'{os.environ["CALITP_BUCKET__GTFS_SCHEDULE_RAW"]}/schedule'
        )
        and "+00:00/base64_url" in task.outcome.extract.path
        for task in tasks
        if task.outcome.extract
    )
    # if top_pbar:
    #     top_pbar.write(
    #         f"i:{i} rows:{len(rows)} dedup:{len(deduplicated)} outs:{len(outcomes)} skip:{skipped} result: {result.path}"
    #     )
    return result, tasks


# result, outcomes_extracts_bytes = handle_one_folder("gtfs-schedule-backfill-test/schedule/2021-04-17T00:00:00+00:00", threads=12)
# result.path, extracts

In [None]:
# Test just one
with ThreadPoolExecutor(max_workers=32) as pool:
    result, tasks = get_zip_tasks_from_folder(
        folder=folders[0],
        handle_one_folder_fs=None,
        pool=pool,
        top_pbar=None,
        i=None,
    )
result.path, set(task.outcome.extract.ts for task in tasks if task.outcome.extract)

In [None]:
# Run this to generate and save all tasks and results files
import pickle
fs = get_fs()
with ThreadPoolExecutor(max_workers=64) as pool:
    all_results = []
    all_tasks = []
    # fs = get_fs()
    # If the kernal dies or you otherwise need to restart from a point, you can
    # re-run this cell with a portion of folders
    # for example folders[340:]
    # folders_pbar = tqdm(folders[410:])
    # folders_pbar = tqdm(list(random.sample(folders[:10], 20)))
    folders_pbar = tqdm(folders)
    for i, folder in enumerate(folders_pbar):
        try:
            result, tasks = get_zip_tasks_from_folder(
                folder=folder,
                handle_one_folder_fs=fs,
                pool=pool,
                top_pbar=folders_pbar,
                i=i,
            )
            assert {result.ts} == set(task.outcome.extract.ts for task in tasks if task.outcome.extract)
            all_results.append(result)
            all_tasks.extend([task for task in tasks if task.outcome.extract])
        except FileNotFoundError as e:
            print(f"unable to find status in {folder}")
        # break

with open("results.pickle", "wb") as f:
    pickle.dump(all_results, f)
with open("tasks.pickle", "wb") as f:
    pickle.dump(all_tasks, f)
len(all_results), len(all_tasks)

In [None]:
# This can be run to load pickled results
import pickle
with open("results.pickle", "rb") as f:
    all_results = pickle.load(f)
with open("tasks.pickle", "rb") as f:
    all_tasks = pickle.load(f)
len(all_results), len(all_tasks)

In [None]:
import humanize

def execute_zip_task(task: ZipTask, dry_run=True, execute_zip_task_fs=None):
    fs = execute_zip_task_fs if execute_zip_task_fs else get_fs()
    assert isinstance(task, ZipTask)
    bytesio = io.BytesIO()
    with ZipFile(bytesio, "w") as zipf:
        for file, creation_ts in task.files.items():
            file_gcs_path = f"{task.gcs_dir}/{file}"
            zipinfo = ZipInfo(
                filename=file,
                date_time=(
                    creation_ts.year,
                    creation_ts.month,
                    creation_ts.day,
                    creation_ts.hour,
                    creation_ts.minute,
                    creation_ts.second,
                ),
            )
            zipf.writestr(zipinfo, fs.cat(file_gcs_path))
    bytesio.seek(0)
    zipfile_bytes = bytesio.read()
    if dry_run:
        print(
            f"DRY RUN: would be saving {humanize.naturalsize(len(zipfile_bytes))} to {task.outcome.extract.path}"
        )
    else:
        task.outcome.extract.save_content(fs=fs, content=zipfile_bytes)

In [None]:
all_tasks[0].gcs_dir, all_tasks[0].files, all_tasks[0].outcome.extract.path

In [None]:
from itertools import zip_longest

assert len(all_tasks) == len(set(task.outcome.extract.path for task in all_tasks))

def grouper(n, iterable, fillvalue=None):
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(fillvalue=fillvalue, *args)

In [None]:
# Actually perform the zips; chunk so we can restart from a given chunk even though the underlying tasks are not executed in order

import random

fses = [get_fs() for _ in range(10)]
chunks = list(enumerate(grouper(1000, all_tasks)))

with ThreadPoolExecutor(max_workers=32) as pool:
    for i, chunk in tqdm(chunks):
        pbar = tqdm(total=len(chunk), desc=f"chunk {i}", leave=False)
        futures = {
            pool.submit(
                execute_zip_task,
                task=task,
                dry_run=False,
                execute_zip_task_fs=random.choice(fses),
            ): task
            for task in chunk if task
        }

        for future in as_completed(futures):
            pbar.update()
            try:
                future.result()
            except:
                print(futures[future])
                raise
        del pbar

In [None]:
for result in tqdm(all_results):
    result.save(fs)