# CNCF and Apache Ecosystem (culled from gharchive.org)

## imports

In [1]:
from datetime import datetime
from time import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import gzip

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from IPython.display import display

import pandas as pd
pd.set_option('display.max_rows', 1024)
pd.set_option('display.max_columns', 512)
pd.set_option('display.width', 1024)

import pyarrow as pa
import pyarrow.parquet as pq
import adbc_driver_postgresql.dbapi

from tqdm.notebook import tqdm

import panel as pn
import altair as alt    # https://altair-viz.github.io/
import vegafusion as vf # https://vegafusion.io/

import simdjson

from notebook_utils import *
#from arrow_utils import *

## Load Data from Parquet

_gharchive.org data has been processed to datasets, one Apache Arrow [Dataset](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html) object per [GitHub Event Type](https://docs.github.com/en/rest/using-the-rest-api/github-event-types?apiVersion=2022-11-28)_

### JSON Dataset (per day)

In [2]:
DATASETS_ROOT = Path("~/gharchive-cncf/cncf-parquet-consolidated").expanduser()
DATASET_PATHS = {}

for file in DATASETS_ROOT.glob("*.parquet"):
    DATASET_PATHS[file.stem] = file
display(DATASET_PATHS)

{'PullRequestEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/PullRequestEvent.parquet'),
 'PushEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/PushEvent.parquet'),
 'WatchEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/WatchEvent.parquet'),
 'IssuesEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/IssuesEvent.parquet'),
 'ForkEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/ForkEvent.parquet'),
 'CreateEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/CreateEvent.parquet'),
 'PublicEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/PublicEvent.parquet'),
 'GollumEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/GollumEvent.parquet'),
 'ReleaseEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parquet-consolidated/ReleaseEvent.parquet'),
 'CommitCommentEvent': PosixPath('/Users/matt/gharchive-cncf/cncf-parqu

In [3]:
def load_parquet_dataset(name: str, path: str) -> pq.ParquetDataset:
    #print(f'Loading dataset: {name} from {path}')
    pqds = pq.ParquetDataset(path, memory_map=True)
    return pqds

In [4]:
DATASETS = {}
DATASET_SCHEMAS = {}

for name, path in DATASET_PATHS.items():
    DATASETS[name] = load_parquet_dataset(name, path)

for name, dataset in DATASETS.items():
    DATASET_SCHEMAS[name] = dataset.schema

display(DATASETS)

{'PullRequestEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x10411fad0>,
 'PushEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c576ed0>,
 'WatchEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c5763d0>,
 'IssuesEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577710>,
 'ForkEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577390>,
 'CreateEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577790>,
 'PublicEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c5777d0>,
 'GollumEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577750>,
 'ReleaseEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c576650>,
 'CommitCommentEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577810>,
 'PullRequestReviewCommentEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577590>,
 'PullRequestReviewEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577650>,
 'IssueCommentEvent': <pyarrow.parquet.core._ParquetDatasetV2 at 0x16c577850>,
 'Delete

### Persist Event Schema to files (.schema)

In [5]:
schema_summaries = {}

def dataset_schema_summary(dataset_path: str) -> pd.DataFrame:
    """
    Creates a summary DataFrame for all fragments in a dataset, including schema details
    and partition information.

    Parameters:
    - dataset_path: The file system path to the dataset.

    Returns:
    A pandas DataFrame with columns for each schema field, including fragment and partition keys.
    """
    dataset = ds.dataset(dataset_path, format="parquet")  # Adjust format as needed
    summary = []

    for fragment in dataset.get_fragments():
        schema = fragment.physical_schema
        
        for field in schema:
            summary.append({
                "Fragment": fragment.path,
                "Field Name": field.name,
                "Type": str(field.type),
                "Nullable": field.nullable
            })

    # Create a DataFrame from the summary list
    df = pd.DataFrame(summary)
    print(f'{df.shape}')
    df.drop_duplicates(inplace=True, subset=["Field Name", "Type", "Nullable"])
    print(f'{df.shape}')
    return df


def replace_table(conn: adbc_driver_postgresql.dbapi.Connection, table_name: str, table: pa.Table):
    """
    Replace the contents of a specified table with new data from an Arrow Table, preserving the table's schema and indexes.

    1. create: temporary table with the new data
    2. rename: existing table --> a temporary name
    3. rename: new table      --> original table name
    4. drop the old table

    Parameters:
    - conn (adbc_driver_postgresql.dbapi.Connection): A connection to the PostgreSQL database.
    - table_name (str): The name of the table to be replaced.
    - table (pa.Table): An Arrow Table containing the new data to replace the existing table's contents.

    TODO: Validate / tests needed:
    
    - If an error occurs during the process, transaction rolled back.
    - The renaming of the table does not change the table's schema or indexes. The indexes will retain their
      original names and continue to reference the renamed table.
    """
    temp_table_name = f"{table_name}_temp"
    old_table_name = f"{table_name}_old"
    
    try:
        with conn.cursor() as cur:
            cur.adbc_ingest(temp_table_name, table, mode="create", temporary=True)
            cur.execute(f"ALTER TABLE {table_name} RENAME TO {old_table_name}")
            cur.execute(f"ALTER TABLE {temp_table_name} RENAME TO {table_name}")
            cur.execute(f"DROP TABLE {old_table_name}")
    except adbc_driver_postgresql.dbapi.Error as e:
        # The transaction is automatically rolled back by the ADBC framework if an error occurs
        # TODO: Need to validate here...adbc docs are a mixture of spec and "use the source luke" :)
        print(f"An error occurred: {e}")

In [6]:
def generate_markdown_documentation(schema: pa.Schema, title: str = "GitHub Archive (https://gharchive.org) Schema Documentation") -> str:
    """
    Generates Markdown documentation for a PyArrow Schema object, formatted for GitHub with improved table formatting.

    Parameters:
    - schema: The PyArrow Schema object to document.
    - title: The title of the documentation. Default is "Schema Documentation".

    Returns:
    - A string containing the Markdown documentation.
    """
    markdown = f"# {title}\n\n"
    markdown += "## Table of Contents\n\n"
    toc = []

    # Function to calculate padding for each column
    def calculate_padding(fields):
        max_name_length = max((len(field.name) for field in fields), default=0)
        max_type_length = max((len(str(field.type)) for field in fields), default=0)
        return max_name_length, max_type_length

    # Calculate padding for the top-level fields
    max_name_length, max_type_length = calculate_padding(schema)

    # Start the table
    markdown += f"| {'Field Name'.ljust(max_name_length)} | {'Type'.ljust(max_type_length)} |\n"
    markdown += f"| {'-'*max_name_length} | {'-'*max_type_length} |\n"

    for field in schema:
        field_name = field.name.ljust(max_name_length)
        field_type = str(field.type).ljust(max_type_length)
        markdown += f"| {field_name} | {field_type} |\n"
        if isinstance(field.type, pa.StructType):
            # Calculate padding for nested fields
            max_nested_name_length, max_nested_type_length = calculate_padding(field.type)
            for sub_field in field.type:
                nested_name = sub_field.name.ljust(max_nested_name_length)
                nested_type = str(sub_field.type).ljust(max_nested_type_length)
                markdown += f"| {' '*max_name_length} | {nested_name} | {nested_type} |\n"
        toc.append(f"- [{field.name}](#{field.name.lower().replace(' ', '-')})\n")

    markdown += "\n## Table of Contents\n\n" + "".join(toc) + "\n\n"
    return markdown

# note: pyarrow.schema() is a factory function that returns a pyarrow.Schema object
schema = pa.schema([
    ('some_int', pa.int32()),
    ('some_string', pa.string()),
    ('some_struct', pa.struct([
        ('f1', pa.int32()),
        ('f2', pa.string())
    ]))
])

print(generate_markdown_documentation(schema))

# GitHub Archive (https://gharchive.org) Schema Documentation

## Table of Contents

| Field Name  | Type                          |
| ----------- | ----------------------------- |
| some_int    | int32                         |
| some_string | string                        |
| some_struct | struct<f1: int32, f2: string> |
|             | f1 | int32  |
|             | f2 | string |

## Table of Contents

- [some_int](#some_int)
- [some_string](#some_string)
- [some_struct](#some_struct)





In [7]:
def strip_schema_prefix(name: str) -> str:
    """
    if prefix found, remove it from the name used for schema file

    names cam be {event_type}.schema, or '{prefix}-{event_type}.schema'
    """
    parts = name.split('-', 1)
    if len(parts) > 1:
        return parts[1]
    else:
        return name

In [8]:
def create_and_save_markdown_docs(schemas: Dict[str, pa.Schema], output_dir: str):
    """
    Generates Markdown documentation for each schema in the provided dictionary and saves them to files.

    Parameters:
    - schemas: A dictionary mapping event types to PyArrow schema objects.
    - output_dir: The directory where the Markdown files will be saved.
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Iterate over the schemas and generate Markdown documentation
    for event_type, schema in schemas.items():
        # Generate the Markdown documentation
        markdown_doc = generate_markdown_documentation(schema, title=event_type)

        # Save the Markdown documentation to a file
        filename = f"{strip_schema_prefix(event_type)}.md"
        with open(os.path.join(output_dir, filename), 'w') as f:
            f.write(markdown_doc)

schemas = {
    'IssuesEvent': pa.schema([
        ('actor', pa.struct([
            ('avatar_url', pa.string()),
            ('display_login', pa.string()),
            ('gravatar_id', pa.string()),
            ('id', pa.int64()),
            ('login', pa.string()),
            ('url', pa.string())
        ])),
        # ... other fields ...
    ]),
    # ... other event types ...
}

# example
create_and_save_markdown_docs(schemas, output_dir='./test-docs')

In [9]:
!cat ./test-docs/IssuesEvent.md

# IssuesEvent

## Table of Contents

| Field Name | Type                                                                                                          |
| ----- | ------------------------------------------------------------------------------------------------------------- |
| actor | struct<avatar_url: string, display_login: string, gravatar_id: string, id: int64, login: string, url: string> |
|       | avatar_url    | string |
|       | display_login | string |
|       | gravatar_id   | string |
|       | id            | int64  |
|       | login         | string |
|       | url           | string |

## Table of Contents

- [actor](#actor)




### Create .schema files and documentation

Regen the schema files from the parquet dataset.  As they are commited to [./docs/schemas](./docs/schemas), it's easy to see any deviation and/or changes to the schema are found in the future.

In [10]:
print(f'{"dataset":<29} | {"schema_file":<51} | {"nbytes":>5}')
print(f'{"":-<29} | {"":-<51} | {"":->5}')
  
for name, dataset in DATASETS.items():
    schema_file = f'./docs/schemas/{name}.schema'
    with open(schema_file, 'w') as f:
        nbytes = f.write(str(dataset.schema))
    print(f'{name:<29} | {schema_file:<51} | {nbytes:>5}')

dataset                       | schema_file                                         | nbytes
----------------------------- | --------------------------------------------------- | -----
PullRequestEvent              | ./docs/schemas/PullRequestEvent.schema              | 26755
PushEvent                     | ./docs/schemas/PushEvent.schema                     |  1107
WatchEvent                    | ./docs/schemas/WatchEvent.schema                    |   728
IssuesEvent                   | ./docs/schemas/IssuesEvent.schema                   |  9883
ForkEvent                     | ./docs/schemas/ForkEvent.schema                     |  4280
CreateEvent                   | ./docs/schemas/CreateEvent.schema                   |   565
PublicEvent                   | ./docs/schemas/PublicEvent.schema                   |   433
GollumEvent                   | ./docs/schemas/GollumEvent.schema                   |   869
ReleaseEvent                  | ./docs/schemas/ReleaseEvent.schema             

### Generate Documentation

**Help Wanted!** Make this better :)

In [12]:
print(DATASET_SCHEMAS.keys())

doc_dir = f'./docs/github-event-types'
create_and_save_markdown_docs(DATASET_SCHEMAS, doc_dir)
!ls -w $doc_dir/*

dict_keys(['PullRequestEvent', 'PushEvent', 'WatchEvent', 'IssuesEvent', 'ForkEvent', 'CreateEvent', 'PublicEvent', 'GollumEvent', 'ReleaseEvent', 'CommitCommentEvent', 'PullRequestReviewCommentEvent', 'PullRequestReviewEvent', 'IssueCommentEvent', 'DeleteEvent', 'MemberEvent'])
./docs/github-event-types/CommitCommentEvent.md
./docs/github-event-types/CreateEvent.md
./docs/github-event-types/DeleteEvent.md
./docs/github-event-types/ForkEvent.md
./docs/github-event-types/GollumEvent.md
./docs/github-event-types/IssueCommentEvent.md
./docs/github-event-types/IssuesEvent.md
./docs/github-event-types/MemberEvent.md
./docs/github-event-types/PublicEvent.md
./docs/github-event-types/PullRequestEvent.md
./docs/github-event-types/PullRequestReviewCommentEvent.md
./docs/github-event-types/PullRequestReviewEvent.md
./docs/github-event-types/PushEvent.md
./docs/github-event-types/ReleaseEvent.md
./docs/github-event-types/WatchEvent.md


In [13]:
@dataclass
class BatchResult:
    table_name: str
    hostname_db: str
    ingest_mode: str
    rows: int
    duration: float
    rows_per_sec: float

@dataclass
class IngestionResult:
    results: List[BatchResult]
    failed_rows: List[Dict]

# TODO: remove and just use divide_and_conquor_ingest
def ingest_batch(conn: adbc_driver_postgresql.dbapi.Connection, table_name: str, batch, ingest_mode: str) -> bool:
    """
    Attempts to ingest a batch of data into the specified table.

    Parameters:
    - conn: An ADBC database connection object.
    - table_name (str): The name of the table to ingest data into.
    - batch: The batch of data to be ingested.

    Returns:
    - bool: True if the batch is successfully ingested, False otherwise.
    """
    with conn.cursor() as cur:
        try:
            nrows = cur.adbc_ingest(table_name, [batch], mode=ingest_mode)
            print(f'{nrows} rows --> {table_name}')
            
        except Exception as e:
            print(f"Failed to ingest batch: {e}")
            return False

    conn.commit()
    return True

def divide_and_conquer_ingest(conn: adbc_driver_postgresql.dbapi.Connection, 
                              table_name: str, 
                              batch: List[Any], 
                              ingest_mode: str, 
                              verbose: bool = True) -> List[Any]:
    """
    Efficient divide and conquer strategy to minimize the number of ingest_batch calls by trying
    to ingest larger chunks of the batch and dividing it only upon failure.

    Parameters:
    - conn: An ADBC database connection object.
    - table_name (str): The name of the table to ingest data into.
    - batch: The batch of data to be processed.
    - verbose (bool): Default is True.

    Returns:
    - list: A list of problematic rows that failed to ingest.
    """
    if not batch:
        return []

    if verbose:
        print(f"Attempting to ingest batch into {table_name} with {len(batch)} rows.")

    if len(batch) == 1:
        # If batch size is 1, attempt to ingest it. If it fails, it's a problematic row.
        if not ingest_batch(conn, table_name, batch, ingest_mode):
            return batch
        return []

    if ingest_batch(conn, table_name, batch, ingest_mode):
        # Entire batch is successfully ingested, take the win and return an empty list.
        return []

    # this batch has failed!
    mid = len(batch) // 2
    left_half = batch[:mid]
    right_half = batch[mid:]

    if verbose:
        print(f"Batch failed to ingest, dividing into {len(left_half)} and {len(right_half)} rows.")

    # attempt each half
    left_failures = divide_and_conquer_ingest(conn, table_name, left_half, verbose)
    right_failures = divide_and_conquer_ingest(conn, table_name, right_half, verbose)

    return left_failures + right_failures

def handle_batches(conn: adbc_driver_postgresql.dbapi.Connection, table_name: str, batches, ingest_mode: str, verbose: bool = True):
    """
    Processes a list of batches, attempting to ingest each one into the specified table.

    Parameters:
    - conn: An ADBC database connection object.
    - table_name (str): The name of the table to ingest data into.
    - batches: A list of batches to be processed.
    - verbose (bool): If True, prints detailed information about the ingestion process. Default is True.

    Returns:
    - tuple: A tuple containing an IngestionResult object and a pandas DataFrame summarizing the ingestion process.
    """

    all_failing_rows = []
    results = []

    for batch in tqdm(batches, desc="Ingesting batches", disable=not verbose):

        failing_rows = divide_and_conquer_ingest(conn=conn, table_name=table_name, batch=batch, ingest_mode=ingest_mode)
        if(len(failing_rows) > 0):
            all_failing_rows.extend(failing_rows)

        results.append(BatchResult(table_name, "hostname_db", ingest_mode, len(batch), 666666, 666)) # TODO: last 2 fields are [rows, rows/sec]

    # Log all failing rows to a JSON lines file (compressed as gz)
    with gzip.open(f"{table_name}_failing_rows.jsonl.gz", "wt") as f:
        for row in all_failing_rows:
            f.write(json.dumps(row) + "\n")

    df_results = pd.DataFrame([result.__dict__ for result in results])
    ir = IngestionResult(results, all_failing_rows)
    return ir, df_results

#
##
### pqfile_to_db() - Parquest --> Postgres
##
#
def pqfile_to_db(conn: adbc_driver_postgresql.dbapi.Connection, table_name: str, pqfile: str, 
                   columns_to_jsonb: Optional[List[str]] = None, ingest_mode: str = "create_append", 
                   table_prefix: Optional[str] = "dev", verbose: bool = True, show_progress: bool = True, 
                   batch_size: int = 10000, replace: bool = False) -> List[BatchResult]:
    """
    Ingests a Parquet file into a PostgreSQL database table.

    Parameters:
    - conn: A connection to the PostgreSQL database.
    - table_name: The name of the table to write.
    - pqfile: The path to the Parquet file.
    - columns_to_jsonb: A list of columns to convert to JSONB.
    - ingest_mode: The ingest mode ("create" or "append").
    - table_prefix: An optional prefix for the table name.
    - verbose: Whether to print verbose output.
    - show_progress: Whether to show a progress bar.
    - batch_size: The number of rows per batch.
    - replace: Whether to replace the table if it exists.

    Returns:
    - A list of BatchResult objects detailing each batch's ingestion process.
    """
    if columns_to_jsonb is None:
        columns_to_jsonb = []

    if table_prefix is not None:
        table_name = f"{table_prefix}_{table_name}"

    if verbose:
        print(f"{datetime.now()} Ingesting {table_name} to {conn}, ingest_mode={ingest_mode}")

    # read the parquet file into a pyarrow Table object
    table = pq.read_table(pqfile)

    # TODO: convert Struct and List types to flat JSON strings for import --> PG via ADBC
    # for now simply drop them
    col_names = set(table.column_names)
    cols_to_drop = col_names.intersection(columns_to_jsonb)
    table = table.drop_columns(cols_to_drop)

    #table = convert_to_jsonb(table, columns_to_jsonb)

    # create a list of batches of data to ingest. 
    batches = table.to_batches(batch_size)

    # results_df = pd.DataFrame(columns=['table_name', 'status', 'rows', 'duration', 'rows_per_sec'])

    ingestion_result, df_results = handle_batches(conn, table_name, batches=batches, ingest_mode=ingest_mode, verbose=verbose)

    display(df_results.head(20))
    display(ingestion_result)
    
    return 

## Data Load Begins Here!

In [None]:
CONN_STR='postgres://username:password@yourhost:5432/gharchive'


#
# These Columns require flattening to JSON strings (not objects) because ADBC doesn't yet support comnplex types

###
# ForkEvent                      : payload.forkee.topics: list<item: null>
# GollumEvent                    : payload.pages: list<item: struct<action: string, html_url: string, page_name: string, sha: string, title: string>>
# IssueCommentEvent              : payload.issue.assignees: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# IssueCommentEvent              : payload.issue.labels: list<item: struct<color: string, default: bool, id: int64, name: string, node_id: string, url: string, description: string>>
# IssueCommentEvent              : payload.comment.performed_via_github_app.events: list<item: string>
# IssuesEvent                    : payload.issue.assignees: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# IssuesEvent                    : payload.issue.labels: list<item: struct<color: string, default: bool, id: int64, name: string, node_id: string, url: string, description: string>>
# PullRequestEvent               : payload.pull_request.assignees: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestEvent               : payload.pull_request.labels: list<item: struct<color: string, default: bool, id: int64, name: string, node_id: string, url: string, description: string>>
# PullRequestEvent               : payload.pull_request.requested_reviewers: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestEvent               : payload.pull_request.requested_teams: list<item: struct<description: string, html_url: string, id: int64, members_url: string, name: string, node_id: string, notification_setting: string, permission: string, privacy: string, repositories_url: string, slug: string, url: string>>
# PullRequestEvent               : payload.pull_request.base.repo.topics: list<item: string>
# PullRequestEvent               : payload.pull_request.head.repo.topics: list<item: string>
# PullRequestReviewCommentEvent  : payload.pull_request.assignees: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestReviewCommentEvent  : payload.pull_request.labels: list<item: struct<color: string, default: bool, id: int64, name: string, node_id: string, url: string, description: string>>
# PullRequestReviewCommentEvent  : payload.pull_request.requested_reviewers: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestReviewCommentEvent  : payload.pull_request.requested_teams: list<item: struct<description: string, id: int64, members_url: string, name: string, node_id: string, permission: string, privacy: string, repositories_url: string, slug: string, url: string, html_url: string, notification_setting: string>>
# PullRequestReviewCommentEvent  : payload.pull_request.base.repo.topics: list<item: string>
# PullRequestReviewCommentEvent  : payload.pull_request.head.repo.topics: list<item: string>
# PullRequestReviewEvent         : payload.pull_request.assignees: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestReviewEvent         : payload.pull_request.labels: list<item: struct<color: string, default: bool, description: string, id: int64, name: string, node_id: string, url: string>>
# PullRequestReviewEvent         : payload.pull_request.requested_reviewers: list<item: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>>
# PullRequestReviewEvent         : payload.pull_request.requested_teams: list<item: struct<description: string, html_url: string, id: int64, members_url: string, name: string, node_id: string, permission: string, privacy: string, repositories_url: string, slug: string, url: string, notification_setting: string>>
# PullRequestReviewEvent         : payload.pull_request.base.repo.topics: list<item: string>
# PullRequestReviewEvent         : payload.pull_request.head.repo.topics: list<item: string>
# PushEvent                      : payload.commits: list<item: struct<author: struct<email: string, name: string>, distinct: bool, message: string, sha: string, url: string>>
# ReleaseEvent                   : payload.release.assets: list<item: struct<browser_download_url: string, content_type: string, created_at: string, download_count: int64, id: int64, label: string, name: string, node_id: string, size: int64, state: string, updated_at: string, uploader: struct<avatar_url: string, events_url: string, followers_url: string, following_url: string, gists_url: string, gravatar_id: string, html_url: string, id: int64, login: string, node_id: string, organizations_url: string, received_events_url: string, repos_url: string, site_admin: bool, starred_url: string, subscriptions_url: string, type: string, url: string>, url: string>>
# ReleaseEvent                   : payload.release.mentions: list<item: struct<avatar_url: string, avatar_user_actor: bool, login: string, profile_url: string, profile_name: string>>
###

###
# https://arrow.apache.org/adbc/0.3.0/driver/cpp/postgresql.html#supported-features
#
# - Bulk ingestion is supported. The mapping from Arrow types to PostgreSQL types is the same as below.
# - Partitioned result sets are not supported.
# - The driver makes use of COPY and the binary format to speed up result set reading. Formal benchmarking is forthcoming.
# - Transactions are supported.
#
# PostgreSQL allows defining new types at runtime, so the driver must build a mapping of available types. 
# This is currently done once at startup. 
# Type support is currently limited. 
# 
# Parameter binding and bulk ingestion support: int16, int32, int64, and string. 
# Reading result sets is limited to:            int32, int64, float, double, and string.
###

columns_to_jsonb={
        'payload.commits', 
        'payload.pages', 
        'payload.release.mentions', 
        'payload.release.assets',
        'payload.pull_request.labels',
        'payload.pull_request.requested_reviewers',
        'payload.pull_request.requested_teams',
        'payload.pull_request.assignees',
        'payload.issue.assignees',
        'payload.issue.labels',
        'payload.issue.body',
        'payload.pages',
        'payload.forkee.topics',
        'payload.pull_request.base.repo.topics',
        'payload.comment.performed_via_github_app.events',
        'payload.pull_request.base.repo.topics',
        'payload.pull_request.head.repo.topics'
        }

for name, dspath in DATASET_PATHS.items():
    print(f"Processing dataset: {name} : {dspath}")

    with adbc_driver_postgresql.dbapi.connect(CONN_STR) as adbc_conn:
        display(adbc_conn)

        try:
            # use Apache Arrow Database Connector (ADBC) to read Parquet files --> postgres table
            num_rows = pqfile_to_db(adbc_conn, name, dspath, table_prefix=f'mattdbg', columns_to_jsonb=columns_to_jsonb)

        # TODO: use a more specific exception type
        except Exception as e:
            print(f"[FAIL] Failed to ingest {name} @ {dspath} : {e}")
            continue
        
        print(f"[INGEST] num_rows={num_rows} ")

## 

## (WIP) Visualizations

In [None]:
df_results.head()

In [None]:
summary_df = df.groupby(['partition_key1', 'partition_key2']).agg('count').reset_index()

In [None]:
import plotly.express as px

def generate_sunburst_chart(summary_df, filename="sunburst_chart.png"):
    fig = px.sunburst(summary_df, 
                      path=['partition_key1', 'partition_key2'], 
                      values='count', 
                      color='count',
                      title='Data Distribution Across Partitions',
                      color_continuous_scale='RdBu')
    fig.update_layout(margin=dict(t=0, l=0, r=0, b=0))
    #save_plotly_chart_as_png(fig, filename)



In [None]:
import altair as alt

def generate_timeline_chart(summary_df, filename="timeline_chart.png"):
    chart = alt.Chart(summary_df).mark_line(point=True).encode(
        x='time:T',  # Adjust for your time-related partition key
        y='count:Q',
        tooltip=['partition_key1', 'partition_key2', 'count']  # Adjust tooltips as needed
    ).properties(
        width=800,
        height=400,
        title='Data Counts Over Time'
    )
    # save_altair_chart_as_png(chart, filename)



In [None]:
def generate_scatterplot_over_time(summary_df, filename="scatterplot_over_time.png"):
    chart = alt.Chart(summary_df).mark_point().encode(
        x='time:T',  # Adjust for your time-related partition key
        y='count:Q',
        tooltip=['partition_key1', 'partition_key2', 'count']  # Adjust tooltips as needed
    ).properties(
        title='Scatterplot of Data Over Time',
        width=800,
        height=400
    ).interactive()  # Enables panning and zooming
    # save_altair_chart_as_png(chart, filename)