## Remote Files

In [None]:
from datetime import date, timedelta
sdate = date(2023,1,1)   # start date
edate = date(2024,2,1)   # end date

import pandas as pd
dates = pd.date_range(sdate,edate-timedelta(days=1),freq='d')
hours = [str(dt.date()) + f"-{h}.json.gz" for dt in dates for h in range(24)]
filenames = ["https://data.gharchive.org/" + hour for hour in hours]
filenames[:5]

In [None]:
import coiled
cluster = coiled.Cluster(
    worker_cpu=1,
    arm=True,
    n_workers=200,
    spot_policy="spot_with_fallback",
)
client = cluster.get_client()

In [None]:
cluster.adapt(minimum=5, maximum=500)

In [None]:
import dask.bag as db
import json

def safe_json_loads(line):
    try:
        return json.loads(line)
    except Exception:
        return None

b = db.read_text(filenames).map(safe_json_loads)
bad = b.filter(lambda x: x is None)
b = b.filter(None)

In [None]:
bad.count().compute()

In [None]:
len(filenames)

In [None]:
b.take(1)

## Local Files

In [None]:
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

In [None]:
import json
import dask.bag as db

b = db.read_text("2024-02-27-15.json", blocksize="50 MiB").map(json.loads)
b.take(2)

## Computations

In [None]:
b.pluck("type").frequencies(sort=True).compute()

In [None]:
def handle_PushEvent(d):
    for commit in d["payload"]["commits"]:
        yield {
            "username": d["actor"]["login"],
            "repo": d["repo"]["name"],
            "sha": commit["sha"],
            "message": commit["message"],
            "created_at": d["created_at"],
        }
        
def handle_CreateEvent(d):
    return {
        "username": d["actor"]["login"],
        "repo": d["repo"]["name"],
        "type": d["payload"]["ref_type"],
        "name": d["payload"]["ref"],
        "description": d["payload"]["description"],
        "created_at": d["created_at"],
    }

def handle_PullRequestEvent(d):
    return {
        "username": d["actor"]["login"],
        "repo": d["repo"]["name"],
        "action": d["payload"]["action"],
        "number": d["payload"]["number"],
        "title": d["payload"]["pull_request"]["title"],
        "author": d["payload"]["pull_request"]["user"]["login"],
        "body": d["payload"]["pull_request"]["body"],
        "pr_created_at": d["payload"]["pull_request"]["created_at"],
        "created_at": d["created_at"],
    }

def handle_IssueCommentEvent(d):
    return {
        "username": d["actor"]["login"],
        "repo": d["repo"]["name"],
        "number": d["payload"]["issue"]["number"],
        "title": d["payload"]["issue"]["title"],
        "author": d["payload"]["issue"]["user"]["login"],
        "issue_created_at":  d["payload"]["issue"]["created_at"],
        "comment": d["payload"]["comment"]["body"],
        "association": d["payload"]["comment"]["author_association"],
        "created_at": d["created_at"],
    }

In [None]:
commits = (
    b.filter(lambda d: d["type"] == "PushEvent")
     .map(handle_PushEvent)
     .flatten()
)
creates = (
    b.filter(lambda d: d["type"] == "CreateEvent")
     .map(handle_CreateEvent)
)
prs = (
    b.filter(lambda d: d["type"] == "PullRequestEvent")
     .map(handle_PullRequestEvent)
)
comments = (
    b.filter(lambda d: d["type"] == "IssueCommentEvent")
     .map(handle_IssueCommentEvent)
)

In [None]:
commits.take(5)

In [None]:
commits.count().compute()

In [None]:
creates.count().compute()

In [None]:
prs.count().compute()

In [None]:
comments.count().compute()

In [None]:
creates.take(5)

In [None]:
prs.take(5)

In [None]:
comments.take(5)

In [None]:
# Things work 
import dask

dask.compute(
    commits.count(),
    creates.count(),
    prs.count(),
    comments.count(),
)

In [None]:
commits.filter(lambda d: " dask" in d["message"]).compute()

In [None]:
comments.filter(lambda d: " dask" in d["comment"] or " dask" in d["title"]).compute()

In [None]:
@task
@coiled.function
def process_json_file(...)
    ...
    
def process_recent_json_files(...):
    last_read = deltalake.Table("commits").tail()[-1].date
    now = datetime.now()
    
    everything_since_last_read: list[str]
    process_json_file.map(everything_since_last_read)

# Questions to ask

-  Projects of issues or commits mentioning Dask.  Recent trends over a few time scales.  Same with people using Dask.
    -  App where people could replace Dask with something else and then we go and crunch some data
-  Synthesize non-event views of repos/users/etc..
    -  Popular repositories from recent activity watch/fork events
