In [None]:
from pyspark.sql import functions as sf
from pyspark.sql import DataFrame

In [None]:
BASE_S3_URI = "s3://book-scraping-data"
PROCESSED_S3_URI = f"{BASE_S3_URI}/processed"

CATALOG = "catalog"
SCHEMA = "bronze"

DATA_CONFIG = [
    {
        "source_filename": "popular_lists.parquet.gz",
        "dest_table": "raw_popular_list",
    },
    {"source_filename": "books.parquet.gz", "dest_table": "raw_book"},
    {
        "source_filename": "book_details.parquet.gz",
        "dest_table": "raw_book_details",
    },
]

In [None]:
def create_checkpoint_path(*, base_s3_uri: str, dest_table: str) -> str:
    """Create a checkpoint path where the inferred schema and subsequent
    table changes will be stored.

    :param base_s3_uri: The base URI of the S3 bucket where the checkpoint
        should be located.
    :param dest_table: The destination table name.
    :return: Checkpoint path of the destination table.
    """
    checkpoint_path = f"{base_s3_uri}/_checkpoint/{dest_table}"

    return checkpoint_path


def create_dest_path(*, catalog: str, schema: str, dest_table: str) -> str:
    """Create a destination path where the data should be written.

    :param catalog: The catalog name.
    :param schema: The schema name.
    :param dest_table: The destination table name.
    :return: Destination path of the table.
    """
    dest_path = f"{catalog}.{schema}.{dest_table}"

    return dest_path


def add_collected_at(df: DataFrame) -> DataFrame:
    """Add the data of collection of the data extracted from
    the source path.

    :param df: The dataframe to which the column should be added.
    :return: Dataframe with the added column.
    """
    df = df.withColumn(
        "collected_at", sf.get(sf.split("source_path", pattern="/"), index=4)
    )

    df = df.withColumn("collected_at", sf.to_date("collected_at"))

    return df


def ingest_to_table(*, source_filename: str, dest_table: str) -> None:
    """Ingest source data into destination table.

    :param source_filename: The source filename.
    :param dest_table: The destination table name.
    :return: None.
    """
    checkpoint_path = create_checkpoint_path(
        base_s3_uri=BASE_S3_URI, dest_table=dest_table
    )
    dest_path = create_dest_path(
        catalog=CATALOG, schema=SCHEMA, dest_table=dest_table
    )

    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .option("pathGlobFilter", source_filename)
        .load(PROCESSED_S3_URI)
    )

    df = df.withColumn("ingested_at", sf.current_timestamp())
    df = df.withColumn("source_path", sf.col("_metadata.file_path"))

    df = add_collected_at(df=df)

    (
        df.writeStream.format("delta")
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .toTable(dest_path)
    )

In [None]:
for data_config in DATA_CONFIG:
  ingest_to_table(**data_config)