In [None]:
import dlt

local_pipeline = dlt.pipeline(
    pipeline_name="elvis",
    destination="duckdb",
    dataset_name="small_data",
)

Goal: Express data movement as code
- version control
- reproducibility
- observability - resistant to knowledge silos
    - workarounds and vendor nuances are embedded into the code

In [None]:
# skip this - already loaded data
from dlt.sources.sql_database import sql_database

source = sql_database()
info = local_pipeline.run(
    source,  # loading from motherduck > connection info defined in config + secrets.toml
    write_disposition="replace",
)
print(info)

In [None]:
with local_pipeline.sql_client() as client:
    client.execute_sql(
        """
    CREATE TABLE repos AS
    SELECT
        stars.repo_name,
        stars.star_count,
        stars.year,
        stars.month,
        repos.programming_language,
        repos.license,
        repos.description
    FROM
        small_data.stars
    LEFT JOIN
        (
            SELECT
                REPLACE(filename, '·', '/') AS repo_name,
                programming_language,
                license,
                description
            FROM
                small_data.perplexity
        ) AS repos
    ON
        stars.repo_name = repos.repo_name;
    """
    )
    print("Loaded from Motherduck into local duckdb")

In [None]:
dataset = local_pipeline.dataset()
dataset.tables

In [None]:
ds_stars = dataset.table("stars")
ds_stars.columns

In [None]:
import pandas as pd

In [None]:
df_stars = ds_stars.select(
    "repo_name",
    "star_count",
    "year",
    "month",
).df()  # also supports arrow, df-iterator
df_stars

In [None]:
df_stars["repo_name"].nunique()

In [None]:
ds_stars["star_count"].max().to_ibis()

In [None]:
ds_stars["star_count"].max()

In [None]:
ds_stars["star_count"].max().df()

In [None]:
ibis_stars = dataset.table("stars").to_ibis()
query = ibis_stars.group_by("year", "month").aggregate(
    max=ibis_stars.star_count.max(),
    average=ibis_stars.star_count.mean(),
)
query

In [None]:
relation = dataset(query)
relation.df()

In [None]:
import typing
from ibis import ir


@dlt.hub.transformation
def monthly_stars(dataset: dlt.Dataset) -> typing.Iterator[ir.Table]:
    stars = dataset.table("stars").to_ibis()
    yield stars.group_by("year", "month").aggregate(
        max=ibis_stars.star_count.max(),
        average=ibis_stars.star_count.mean(),
    )

In [None]:
relation_from_function = list(monthly_stars(dataset))[0]
relation_from_function

In [None]:
relation_from_function.schema

In [None]:
relation_from_function.df()

In [None]:
@dlt.hub.transformation
def enrich_with_repo_data(dataset: dlt.Dataset) -> typing.Iterator[ir.Table]:
    stars = dataset.table("stars").to_ibis()
    perplexity = dataset.table("perplexity").to_ibis()
    repos = perplexity.select(
        repo_name=perplexity.filename.replace("·", "/"),
        programming_language=perplexity.programming_language,
        license=perplexity.license,
        description=perplexity.description,
    )
    result = stars.left_join(repos, stars.repo_name == repos.repo_name).select(
        stars.repo_name,
        stars.star_count,
        stars.year,
        stars.month,
        repos.programming_language,
        repos.license,
        repos.description,
    )
    yield result

In [None]:
@dlt.source
def analytics_collection(a_dataset: dlt.Dataset) -> list:
    return [
        monthly_stars(a_dataset),
        enrich_with_repo_data(a_dataset),
    ]

In [None]:
destination = dlt.destinations.duckdb("repos.duckdb")
transformation_pipeline = dlt.pipeline("repos", destination=destination)

In [None]:
ingest_load_info = transformation_pipeline.run(analytics_collection(dataset))
ingest_load_info

In [None]:
fraud_pipeline = dlt.pipeline(
    pipeline_name="fraud_detector",
    destination="duckdb",
    dataset_name="fraud_detection",
)

import pyarrow.compute as pc

In [None]:
@dlt.resource(table_name="fraud_detection_source")
def fraud_detection(filter_out_cheat_patterns: bool = False):
    cheat_patterns: list[str] = [
        "provides cheats",
    ]
    users = local_pipeline.dataset().table("perplexity")

    for arrow_table in users.iter_arrow(chunk_size=2048):
        # Remove nulls from descriptions
        description_not_null = pc.is_valid(arrow_table["description"])
        arrow_table = arrow_table.filter(description_not_null)

        # Filter out "Unknown" values for programming_language and license
        if "programming_language" in arrow_table.schema.names:
            programming_language_not_unknown = pc.not_equal(
                arrow_table["programming_language"], "Unknown"
            )
            arrow_table = arrow_table.filter(programming_language_not_unknown)

        if "license" in arrow_table.schema.names:
            license_not_unknown = pc.not_equal(arrow_table["license"], "Unknown")
            arrow_table = arrow_table.filter(license_not_unknown)

        # Filter for rows containing cheat patterns
        if cheat_patterns:
            contains_cheat_text = None

            for pattern in cheat_patterns:
                contains_pattern = pc.match_substring(
                    pc.utf8_lower(arrow_table["description"]), pattern.lower()
                )
                contains_cheat_text = (
                    contains_pattern
                    if contains_cheat_text is None
                    else pc.or_(contains_cheat_text, contains_pattern)
                )

            if filter_out_cheat_patterns and contains_cheat_text is not None:
                arrow_table = arrow_table.filter(pc.invert(contains_cheat_text))

        # Clean filename to repo path and construct GitHub URL
        cleaned_filename = pc.replace_substring(
            arrow_table["filename"], pattern="·", replacement="/"
        )
        github_url = pc.binary_join_element_wise(
            "https://github.com/", cleaned_filename, ""
        )
        arrow_table = arrow_table.set_column(
            arrow_table.schema.get_field_index("filename"), "url", github_url
        )

        yield arrow_table

In [None]:
transform_load_info = fraud_pipeline.run(
    fraud_detection(filter_out_cheat_patterns=True)
)
transform_load_info