# Github Experiments
This notebook sets up a local environment for simulating queries on user data. It imports the Github dataset from https://www.gharchive.org/ and launches a `tiresias` client for each user in the dataset, allowing us to run queries on the full dataset and simulate a distributed setting.

### Data Collection
Let's download and process the github dataset. We'll parse out the common event types and load them into pandas dataframes.

In [1]:
import os
import requests
import gzip, json
import pandas as pd

In [4]:
# Download the Github dataset
path = lambda x: os.path.join("/tmp/", x)
if os.path.exists(path("github.json.gz")):
    os.remove(path("github.json.gz"))
url = "https://data.gharchive.org/2015-01-01-1.json.gz"
r = requests.get(url, allow_redirects=True)
with open(path("github.json.gz"), 'ab') as fout:
    fout.write(r.content)

In [5]:
# Set up the export tables
tables = {}
def export():
    for table_name, rows in tables.items():
        pd.DataFrame(rows).to_csv(path("%s.csv" % table_name), index=False)

def CreateEvent(obj):
    if "create_event" not in tables:
        tables["create_event"] = []
    tables["create_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "master": obj["payload"]["master_branch"],
        "description": obj["payload"]["description"],
        "ref": obj["payload"]["ref"],
        "ref_type": obj["payload"]["ref_type"],
    })

def PushEvent(obj):
    if "push_event" not in tables:
        tables["push_event"] = []
    tables["push_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "branch": obj["payload"]["ref"],
        "nb_commits": obj["payload"]["size"],
    })

def WatchEvent(obj):
    if "watch_event" not in tables:
        tables["watch_event"] = []
    tables["watch_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
    })

def ReleaseEvent(obj):
    if "release_event" not in tables:
        tables["release_event"] = []
    tables["release_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "action": obj["payload"]["action"],
        "tag_name": obj["payload"]["release"]["tag_name"],
    })

def PullRequestEvent(obj):
    if "pull_request_event" not in tables:
        tables["pull_request_event"] = []
    tables["pull_request_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "action": obj["payload"]["action"],
        "body": obj["payload"]["pull_request"]["body"],
        "state": obj["payload"]["pull_request"]["state"],
        "commits": obj["payload"]["pull_request"]["commits"],
        "additions": obj["payload"]["pull_request"]["additions"],
        "deletions": obj["payload"]["pull_request"]["deletions"],
        "changed_files": obj["payload"]["pull_request"]["changed_files"]
    })

def IssuesEvent(obj):
    if "issues_event" not in tables:
        tables["issues_event"] = []
    tables["issues_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "action": obj["payload"]["action"],
        "title": obj["payload"]["issue"]["title"],
        "body": obj["payload"]["issue"]["body"],
        "comments": obj["payload"]["issue"]["comments"],
        "state": obj["payload"]["issue"]["state"],
    })
    
def ForkEvent(obj):
    if "fork_event" not in tables:
        tables["fork_event"] = []
    tables["fork_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "timestamp": obj["created_at"],
        "forkee_name": obj["payload"]["forkee"]["full_name"],
        "forkee_description": obj["payload"]["forkee"]["description"],
        "forkee_size": obj["payload"]["forkee"]["size"],
        "forkee_stargazers": obj["payload"]["forkee"]["stargazers_count"],
        "forkee_watchers": obj["payload"]["forkee"]["watchers_count"],
        "forkee_has_issues": obj["payload"]["forkee"]["has_issues"],
        "forkee_has_downloads": obj["payload"]["forkee"]["has_downloads"],
        "forkee_has_wiki": obj["payload"]["forkee"]["has_wiki"],
        "forkee_has_pages": obj["payload"]["forkee"]["has_pages"],
    })
        
def DeleteEvent(obj):
    if "delete_event" not in tables:
        tables["delete_event"] = []
    tables["delete_event"].append({
        "id": obj["id"],
        "user_id": obj["actor"]["login"],
        "repo_name": obj["repo"]["name"],
        "ref": obj["payload"]["ref"],
        "ref_type": obj["payload"]["ref_type"],
    })

In [6]:
# Iterate over the dataset and generate the tables
with gzip.open(path("github.json.gz"), "rt") as fin:
    for event in map(json.loads, fin):
        if event["type"] == "CreateEvent":        CreateEvent(event)
        elif event["type"] == "PushEvent":        PushEvent(event)
        elif event["type"] == "WatchEvent":       WatchEvent(event)
        elif event["type"] == "ReleaseEvent":     ReleaseEvent(event)
        elif event["type"] == "PullRequestEvent": PullRequestEvent(event)
        elif event["type"] == "IssuesEvent":      IssuesEvent(event)
        elif event["type"] == "ForkEvent":        ForkEvent(event)
        elif event["type"] == "DeleteEvent":      DeleteEvent(event)
        else: pass

# Convert the tables into dataframes
userids = set()
dataframes = {}
for table_name, rows in tables.items():
    df = pd.DataFrame(rows)
    dataframes[table_name] = df
    userids.update(df["user_id"].values.tolist())
    print("Table %s has %s rows." % (table_name, len(df)))
print("Identified %s unique users." % len(userids))

Table push_event has 4150 rows.
Table pull_request_event has 293 rows.
Table watch_event has 649 rows.
Table issues_event has 330 rows.
Table create_event has 727 rows.
Table fork_event has 222 rows.
Table delete_event has 81 rows.
Table release_event has 43 rows.
Identified 3100 unique users.


### Tiresias Server
Let's set up the tiresias server. We'll launch it in the background and tell it to listen on port 3000.

In [65]:
import subprocess
server = subprocess.Popen(["tiresias-server", "--port", "3000"])

### Tiresias Clients
Now let's launch a client for each user in the dataset, configure the github app schema, and load the data.

In [66]:
from tqdm import tqdm
from time import sleep
from json import dumps, loads

schema = {}
for table_name, df in dataframes.items():
    schema[table_name] = {
        "description": "",
        "columns": {c: {"type": "float", "description": ""} for c in df.columns}
    }
    
userids = list(userids)[:100] # Let's just use the first few users

clients = []
for i, userid in tqdm(enumerate(userids)):
    port = 8000 + i
    client = subprocess.Popen([
        "tiresias", 
        "--db_port", str(port), 
        "--db_dir", path("tiresias/%s" % port)
    ])
    clients.append(client)
sleep(10) # Wait a few secs for them to launch

for i, userid in tqdm(enumerate(userids)):
    port = 8000 + i
    payload = {}
    for table_name, df in dataframes.items():
        payload[table_name] = df[df["user_id"] == userid].to_dict('records')
    requests.get("http://localhost:%s/app/github/register" % port, params={"schema": dumps(schema)})
    requests.get("http://localhost:%s/app/github/insert" % port, params={"payload": dumps(payload)})

10it [00:00, 28.22it/s]
10it [00:01,  7.66it/s]


### Experiments

In [67]:
response = requests.get("http://127.0.0.1:3000/query", params={
    "query": dumps({
        "type": "basic",
        "epsilon": 10.0,
        "featurizer": "SELECT 1.0*COUNT(*) FROM github.create_event",
        "aggregator": "mean",
    })
})
qid = response.text

In [74]:
loads(requests.get("http://127.0.0.1:3000/query/%s" % qid).text)

{'type': 'basic',
 'epsilon': 10.0,
 'featurizer': 'SELECT 1.0*COUNT(*) FROM github.create_event',
 'aggregator': 'mean',
 'id': '031499a8-8803-4f3f-93df-76d99968de88',
 'status': 'COMPLETE',
 'count': 10,
 'result': 1.999997929993876}

### Cleanup
Let's kill all the processes we launched.

In [57]:
import shutil
for client in clients:
    client.kill()
server.kill()
shutil.rmtree(path("tiresias"))