Copy wholesale the `airtable_to_gcs.py` script here, then annotate it for personal use so we deconstruct what it is doing. We will make a similar one for the BlackCat API data.

In [None]:
import gzip
import os
from typing import Optional

import pandas as pd
import pendulum
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import get_fs, make_name_bq_safe
from pyairtable import Table
from pydantic import BaseModel

from airflow.models import BaseOperator


def process_arrays_for_nulls(arr):
    """
    BigQuery doesn't allow arrays that contain null values --
    see: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#array_nulls
    Therefore we need to manually replace nulls with falsy values according
    to the type of data in the array.
    """
    types = set(type(entry) for entry in arr if entry is not None)

    if not types:
        return []
    # use empty string for all non-numeric types
    # may need to expand this over time
    filler = -1 if types <= {int, float} else ""
    return [x if x is not None else filler for x in arr]


def make_arrays_bq_safe(raw_data):
    safe_data = {}
    for k, v in raw_data.items():
        if isinstance(v, dict):
            v = make_arrays_bq_safe(v)
        elif isinstance(v, list):
            v = process_arrays_for_nulls(v)
        safe_data[k] = v
    return safe_data


# TODO: this should use the new generic partitioned GCS artifact type once available
class AirtableExtract(BaseModel):
    air_base_id: str
    air_base_name: str
    air_table_name: str
    data: Optional[pd.DataFrame]
    extract_time: Optional[pendulum.DateTime]

    # pydantic doesn't know dataframe type
    # see https://stackoverflow.com/a/69200069
    class Config:
        arbitrary_types_allowed = True

    def fetch_from_airtable(self, api_key):
        """Download an Airtable table as a DataFrame.

        Note that airtable records have rows structured as follows:
            [{"id", "fields": {colname: value, ...}, ...]

        This function applies renames in the following order.

            1. rename id
            2. rename fields
            3. apply column prefix (to columns not renamed by 1 or 2)
        """

        print(
            f"Downloading airtable data for {self.air_base_name}.{self.air_table_name}"
        )
        ## There's nly 1 table from this API; gathered here in a pyairtable Table object
        all_rows = Table(api_key, self.air_base_id, self.air_table_name).all()

        # this is not used until the `make_hive_path` function
        self.extract_time = pendulum.now()

        # convert the Table (list-like object?) to a DF while renaming, using the 1st 2 functions in this script
        raw_df = pd.DataFrame(
            [
                {"id": row["id"], **make_arrays_bq_safe(row["fields"])}
                for row in all_rows
            ]
        )

        self.data = raw_df.rename(make_name_bq_safe, axis="columns")

    ## Constructs the name of the GCS bucket & folder each table will be saved into
    def make_hive_path(self, bucket: str):
        if not self.extract_time:
            # extract_time is usually set when airtable_to_df is called & data is retrieved
            raise ValueError(
                "An extract time must be set before a hive path can be generated."
            )
        safe_air_table_name = (
            str.lower("_".join(self.air_table_name.split(" ")))
            .replace("-", "_")
            .replace("+", "and")
        )
        
        # These buckets and zipped files are visible in GCS
        return os.path.join(
            bucket,
            f"{self.air_base_name}__{safe_air_table_name}",
            f"dt={self.extract_time.to_date_string()}",
            f"ts={self.extract_time.to_iso8601_string()}",
            f"{safe_air_table_name}.jsonl.gz",
        )

    # Using function above, takes data from `fetch_from_airtable` and compresses it to JSON 
    def save_to_gcs(self, fs, bucket):
        hive_path = self.make_hive_path(bucket)
        print(f"Uploading to GCS at {hive_path}")
        assert self.data.any(None), "data does not exist, cannot save"
        fs.pipe(
            hive_path,
            gzip.compress(self.data.to_json(orient="records", lines=True).encode()),
        )
        return hive_path


### Uses all the classes and functions above it. Contains the `execute` function which brings all the functions together
# This is the operator that is called in the Airflow DAGs, like here: 
# airflow/dags/airtable_loader_v2/california_transit_county_geography.yml
class AirtableToGCSOperator(BaseOperator):
    template_fields = ("bucket",)

    def __init__(
        self,
        bucket,
        air_base_id,
        air_base_name,
        air_table_name,
        api_key=None,
        **kwargs,
    ):
        """An operator that downloads data from an Airtable base
            and saves it as a JSON file hive-partitioned by date and time in Google Cloud
            Storage (GCS).

        Args:
            bucket (str): GCS bucket where the scraped Airtable will be saved.
            air_base_id (str): The underlying id of the Airtable instance being used.
            air_base_name (str): The string name of the Base.
            air_table_name (str): The table name that should be extracted from the
                Airtable Base
            api_key (str, optional): The API key to use when downloading from airtable.
                This can be someone's personal API key. If not provided, the environment
                variable of `CALITP_AIRTABLE_API_KEY` is used.
        """
        self.bucket = bucket
        
        # calling the above class 'AirtableExtract'
        self.extract = AirtableExtract(
            air_base_id=air_base_id,
            air_base_name=air_base_name,
            air_table_name=air_table_name,
        )
        self.api_key = api_key

        super().__init__(**kwargs)

    def execute(self, **kwargs):
        api_key = self.api_key or get_secret_by_name("CALITP_AIRTABLE_API_KEY")
        # calling the 'fetch_from_airtable' function within the class instance of 'AirtableExtract', 
        # which here is an object called `extract`.
        self.extract.fetch_from_airtable(api_key)
        fs = get_fs()
        # inserts into xcoms
        return self.extract.save_to_gcs(fs, self.bucket)