# User Search
For use to:
1. Try to find an account based on random knowledge
2. List all orgs they belong to (from a subset)
  - You will need org owner permissions to perform these searches

# Boiler plate
Skip/hide this. Common usage is below.

If you see this text (and you're not in VSCode), you may want to enable the nbextension "Collapsible Headings", so you can hide this in common usage.

## Tune as needed

There are several lru_cache using functions. Many of them are called len(orgs_to_check) times. If they are under sized, run times will get quite long. (Only the first query should be delayed - after that, all data should be in the cache.)

See the "cache reporting" cell below.

#### Configuration

In [None]:
# Set some initial values

# Support running in vs-code by emulating "init-cell behavior"
code_initialization_completed = False

# for now, ACL lookup is quite time consuming, so assume it will be done in a different run
defer_acl_lookup = True

# for now, assume person doing GitHub offboard is not doing Heroku offboard
defer_heroku_lookup = True

# There are 3 ways to set the orgs to examine:
#   1. use a canned list
#   2. use just 2 for testing
#   3. get a list of accessible orgs from GitHub
#
# #3 is preferred - the others are fallbacks

use_github_org_list = True
use_test_org_list = False
use_canned_org_list = False

# hacks for typing
from typing import Any, List

gh: Any = None

# bogus line to catch bad type hint
fred= 3

## Code

### General Debug

In [None]:
# Support Javascript Alerts on failures
def display_javascript_alert(text):
    from IPython.display import Javascript
    # the inbound python string my have newlines in them -- escape them or JS complains
    js_text = text.replace('\n', '\\n')
    simpjs = Javascript(f'alert("{js_text}")')
    display(simpjs)


def failed_initialization(msg):
    display_javascript_alert(f"Initialization failure: '{msg}'")
    raise SystemExit(3, msg)

# failed_initialization("does it work?")

### main code (CIS/IAM)

Not every operator will have a valid token for the CIS system, so fail gently if not

In [None]:
def check_CIS(email):
    login = first_name = None
    if _has_cis_access():
        login, first_name = _get_cis_info(email)
        display(f"CIS info for {email} reports '{login}', first name of '{first_name}'")
    else:
        display("Skipping CIS check, no token available.")
    return login, first_name

In [None]:
import sys
print(sys.executable)

In [None]:
def _has_cis_access():
    import os

    return os.environ.get("CIS_CLIENT_ID", "") and os.environ.get(
        "CIS_CLIENT_SECRET", ""
    )

In [None]:
_cis_bearer_token = None
import requests


def _get_cis_bearer_token():
    global _cis_bearer_token
    if _cis_bearer_token:
        return _cis_bearer_token
    else:
        import requests

        url = "https://auth.mozilla.auth0.com/oauth/token"
        headers = {"Content-Type": "application/json"}
        payload = {
            "client_id": os.environ["CIS_CLIENT_ID"],
            "client_secret": os.environ["CIS_CLIENT_SECRET"],
            "audience": "api.sso.mozilla.com",
            "grant_type": "client_credentials",
        }
        resp = requests.post(url, json=payload, headers=headers)
        data = resp.json()
        _cis_bearer_token = data["access_token"]
        return _cis_bearer_token


def _get_cis_info(email):
    import urllib.request, urllib.parse, urllib.error

    bearer_token = _get_cis_bearer_token()
    # first get the v4 id
    url = (
        "https://person.api.sso.mozilla.com/v2/user/primary_email/{}?active=any".format(
            urllib.parse.quote(email)
        )
    )
    headers = {"Authorization": f"Bearer {bearer_token}"}
    resp = requests.get(url, headers=headers)
    data = resp.json()
    login = v4id = None
    try:
        first_name = data["first_name"]["value"].lower()
    except KeyError:
        try:
            print(f"DEBUG: {data=}")
            print(f"DEBUG: {data['first_name']=}")
            print(f"DEBUG: {data['first_name']['value']=}")
        except KeyError:
            pass
        first_name = None
    try:
        v4id = data["identities"]["github_id_v4"]["value"]
    except KeyError:
        pass
    if v4id:
        # if there was a v4 id, map it to a login, via graphQL
        query = """
            query id_lookup($id_to_check: ID!) {
              node(id: $id_to_check) {
                ... on User {
                  login
                  id
                  databaseId
                }
              }
            }
            """
        variables = '{ "id_to_check": "' + str(v4id) + '" }'
        url = "https://api.github.com/graphql"
        headers = {"Authorization": f"Token {api_key}"}
        payload = {
            "query": query,
            "variables": variables,
        }
        resp = requests.post(url, headers=headers, json=payload)
        try:
            data = resp.json()
            login = data["data"]["node"]["login"].lower()
        except  (TypeError, KeyError):
            login = None
    return login, first_name

### Debug CIS

In [None]:
if code_initialization_completed:
    print()
    _get_cis_info(
        """ 
    hwine@mozilla.com
    """
    )
    print("done")

### main code (GitHub)

#### helpers GitHub3.py

In [None]:
# print some debug information
import github3

print(github3.__version__)
print(github3.__file__)

In [None]:
import sys
sys.executable

In [None]:
import sys
print(sys.executable)


In [None]:
# set values here - you can also override below
import os

api_key = os.environ.get("GITHUB_PAT", "")
if not api_key:
    display_javascript_alert("no GitHub PAT found (continuing assuming debug)")

In [None]:
import time
import ipywidgets, IPython
print(ipywidgets.__file__)
print(IPython.__file__)
print(IPython.display.__file__)

In [None]:
def try_login():
    global gh
    gh = None
    # handle debugging case, where api_key is None
    if not api_key:
        return
    try:
        gh = github3.login(token=api_key)
        print(f"You are authenticated as {gh.me().login}")
    except (github3.exceptions.ForbiddenError) as e:
        failed_initialization("Invalid credentials")
        raise
    except (github3.exceptions.ConnectionError) as e:
        failed_initialization(f"Exception while opening connection (type {type(e)}):\n{str(e)}")
        raise
    except Exception as e:
        failed_initialization(f"Unexpected exception while opening connection (type {type(e)}):\n{str(e)}")
        raise

try_login()
from pprint import pprint
pprint(gh)
if gh is None and api_key is None:
    # give up if we can't login at script start
    failed_initialization("Couldn't log in to GitHub")

From here on, use ``gh`` to access all GitHub data

In [None]:
# set the local timezone (the container is UTC)
if not os.environ.get("TZ", ""):
    os.environ["TZ"] = "UTC"
print(f'local timezone is {os.environ["TZ"]}.')
time.tzset()


def print_limits(e=None, verbose=False):
    if e:
        #         display("API limit reached, try again in 5 minutes.\n")
        display(str(e))

    # support debug mode in vscode
    if not gh:
        print("Debug mode - no real gh connection")
        return
    reset_max = reset_min = 0
    limits = gh.rate_limit()
    resources = limits["resources"]
    #     print("{:3d} keys: ".format(len(resources.keys())), resources.keys())
    #     print(resources)
    for reset in list(resources.keys()):
        reset_at = resources[reset]["reset"]
        reset_max = max(reset_at, reset_max)
        if not resources[reset]["remaining"]:
            reset_min = min(reset_at, reset_min if reset_min else reset_at)
            if verbose:
                print("EXPIRED for {} {}".format(reset, resources[reset]["remaining"]))
        else:
            if verbose or reset == "search":
                print(
                    "remaining for {} {}".format(reset, resources[reset]["remaining"])
                )

    if not reset_min:
        print("No limits reached currently.")
    else:
        print(
            "Minimum reset at {} UTC ({})".format(
                time.asctime(time.gmtime(reset_min)),
                time.asctime(time.localtime(reset_min)),
            )
        )
    print(
        "All reset at {} UTC ({})".format(
            time.asctime(time.gmtime(reset_max)),
            time.asctime(time.localtime(reset_max)),
        )
    )

def wait_for_quota(which:str) -> None:
    """ wait for specified search limit to reset
    """
    limits = gh.rate_limit()
    the_limit = limits["resources"][which]
    if the_limit["remaining"] < 2:
        wait_seconds = the_limit["reset"] - int(time.time()) + 1
        print(f"waiting for {which} limit to reset at {time.asctime(time.localtime(the_limit['reset']))}"
             f", {wait_seconds} seconds from now")
        time.sleep(wait_seconds)



In [None]:
from functools import lru_cache

print_limits()

In [None]:
try:
    failure_reason = None
    orgs_to_check = set()
    # support vs-code debug
    if not gh:
        orgs_to_check.add("no-such-org")
    elif use_github_org_list:
        my_login = gh.me().login
        # while a comprehension looks nice here, it doesn't let us recover from permission errors in the loops,
        # which can happen when some orgs change parameters from "expected"
        # orgs_to_check = set([o.login for o in gh.organizations() if my_login in [x.login for x in o.members(role="admin")]])
        for o in gh.organizations():
            try:
                if my_login in [x.login for x in o.members(role="admin")]:
                    orgs_to_check.add(o.login)
            except github3.exceptions.ForbiddenError as e:
                display_javascript_alert(f"Skipping org {o.login}, as your token doesn't work with it. ({str(e)})")
        ...
        if not len(orgs_to_check):
            failure_reason = "User isn't an admin for any orgs"

    elif use_test_org_list:
        orgs_to_check = set(
            """
            mozilla-services
            mozilla
        """.split()
        )
    elif use_canned_org_list:  # old school
        orgs_to_check = set(
            """
        Mozilla-Commons
        Mozilla-Games
        Mozilla-TWQA
        MozillaDPX
        MozillaDataScience
        MozillaFoundation
        MozillaReality
        MozillaSecurity
        MozillaWiki
        Pocket
        Thunderbird-client
        devtools-html
        firefox-devtools
        fxos
        fxos-eng
        iodide-project
        mdn
        moz-pkg-testing
        mozilla
        mozilla-applied-ml
        mozilla-archive
        mozilla-b2g
        mozilla-bteam
        mozilla-conduit
        mozilla-extensions
        mozilla-frontend-infra
        mozilla-iam
        mozilla-it
        mozilla-jetpack
        mozilla-l10n
        mozilla-lockbox
        mozilla-lockwise
        mozilla-metrics
        mozilla-mobile
        mozilla-partners
        mozilla-platform-ops
        mozilla-private
        mozilla-rally
        mozilla-releng
        mozilla-services
        mozilla-spidermonkey
        mozilla-standards
        mozilla-svcops
        mozilla-tw
        mozmeao
        nss-dev
        nubisproject
        projectfluent
        taskcluster
        """.split()
        )
    else:
        failure_reason = "No org list method enabled!"

except Exception as e:
    failure_reason = f"exception: {str(e)}"
    pass

if failure_reason:
    failed_initialization(failure_reason)

print(f"{len(orgs_to_check):3d} orgs to check.")

In [None]:
from IPython import get_ipython
get_ipython()

#### helpers (GitHub searches)

In [None]:
# rate limiting hack
_last_query_interval: float = 0.0
_add_seconds_heuristic: float = 1
_min_seconds_between_calls: float = (30.0/60) + _add_seconds_heuristic

_max_usable_users: int = 10

# ToDo if rate limiting still an issue, could terminate after returning
# _max_usable_users

def _search_for_user(user: str):
    # we can only rate limit between calls
    global _last_query_interval, _min_seconds_between_calls
    seconds_to_wait = (_last_query_interval + _min_seconds_between_calls) - time.monotonic()
    if seconds_to_wait > 0:
        # print(f"_search_for_user: waiting {seconds_to_wait} seconds.")
        time.sleep(seconds_to_wait)
        wait_for_quota("search")
    try:
        for i, user in enumerate(gh.search_users(query="type:user " + user)):
            yield user
            if i > _max_usable_users:
                break
        _last_query_interval = time.monotonic()
    except Exception as e:
        print_limits(e, verbose=True)
        raise


@lru_cache(maxsize=512)
def _search_for_org(user):
    l = list(gh.search_users(query="type:org " + user))
    display(f"found {len(l)} potentials for {user}")
    return l

@lru_cache(maxsize=512)
def get_users(user):
    # display(u"SEARCH '{}'".format(user))
    l = list(_search_for_user(user))
    display(f"found {len(l)} potentials for {user}")
    yield from l

In [None]:
displayed_users = set()  # cache to avoid duplicate output


def show_users(user_list, search_term):
    global displayed_users, _max_usable_users
    unique_users = set(user_list)
    count = len(unique_users)
    
    if count > _max_usable_users:
        # Even if there are too many, we still want to check the 'root' term, if it matched
        try:
            seed_user = gh.user(search_term)
            if ' ' not in seed_user.login:
                displayed_users.add(seed_user)
                display(
                    "... too many to be useful, still trying '{}' ...".format(
                        seed_user.login
                    )
                )
            else:
                display(
                    "... too many to be useful, skipping phrase '{}' ...".format(
                        seed_user.login
                    )
                )
        except github3.exceptions.NotFoundError as e:
            display(f"... too many to be useful, '{search_term}' is not a user")
    else:
        for u in [x for x in unique_users if not x in displayed_users]:
            displayed_users.add(u)
            user = u.user.refresh()
    if 0 < count <= _max_usable_users:
        return [u.login for u in unique_users]
    else:
        return []


from itertools import permutations


def _permute_seeds(seeds):
    if len(seeds) == 1:
        yield seeds[0]
    else:
        for x, y in permutations(seeds, 2):
            permutation = " ".join([x, y])
            display(f"   trying phrase permutation {permutation}")
            yield permutation
            permutation = "".join([x, y])
            display(f"   trying permutation {permutation}")
            yield permutation


def gather_possibles(seeds):
    found = set()
    # sometimes get a phrase coming in - e.g. "First Last"
    for seed in _permute_seeds(seeds.split()):
        maybes = show_users(get_users(seed), seed)
        found.update(maybes)
        # if it was an email addr, try again with the mailbox name
        if "@" in seed:
            seed2 = seed.split("@")[0]
            # fix https://github.com/mozilla/github-org-scripts/issues/84
            if len(seed2) >= 4:
                display(f"Searching for mailbox name '{seed2}' (gather_possibles)")
                maybes = show_users(get_users(seed2), seed2)
                found.update(maybes)
            else:
                display(f"Skipping search for '{seed2}' -- too short")
    return found

In [None]:
class OutsideCollaboratorIterator(github3.structs.GitHubIterator):
    def __init__(self, org):
        super().__init__(
            count=-1,  # get all
            url=org.url + "/outside_collaborators",
            cls=github3.users.ShortUser,
            session=org.session,
        )


@lru_cache(maxsize=512)
def get_collaborators(org):
    collabs = [x.login.lower() for x in OutsideCollaboratorIterator(org)]
    return collabs


def is_collaborator(org, login):
    return bool(login.lower() in get_collaborators(org))


# provide same interface for members -- but the iterator is free :D
@lru_cache(maxsize=512)
def get_members(org):
    collabs = [x.login.lower() for x in org.members()]
    return collabs


def is_member(org, login):
    return bool(login.lower() in get_members(org))

In [None]:
@lru_cache(maxsize=64)
def get_org_owners(org):
    owners = org.members(role="admin")
    logins = [x.login for x in owners]
    return logins


@lru_cache(maxsize=128)
def get_inspectable_org_object(org_name):
    try:
        o = gh.organization(org_name)
        # make sure we have enough chops to inspect it
        get_org_owners(o)
        is_member(o, "qzu" * 3)
        is_collaborator(o, "qzu" * 3)
    except github3.exceptions.NotFoundError:
        o = None
        display(f"No such organization: '{org_name}'")
    except github3.exceptions.ForbiddenError as e:
        o = None
        display(f"\n\nWARNING: Not enough permissions for org '{org_name}'\n\n")
    except Exception as e:
        o = None
        display(f"didn't expect to get here: get_inspectable_org_object({org_name})")
        display(f"  exception: {type(e)=}")
        from pprint import pformat
        display(f"{pformat(e)}")
    return o


def check_login_perms(logins, headers=None, ldap=None):
    any_perms = []
    any_perms.append("=" * 30)
    if headers:
        any_perms.extend(headers)
    if not len(logins):
        any_perms.append("\nFound no valid usernames")
    else:
        any_perms.append(
            "\nChecking {} usernames for membership in {} orgs".format(
                len(logins), len(orgs_to_check)
            )
        )
        for login in logins:
            start_msg_count = len(any_perms)
            for org in orgs_to_check:
                o = get_inspectable_org_object(org)
                if o is None:
                    continue
                if is_member(o, login):
                    url = "https://github.com/orgs/{}/people?utf8=%E2%9C%93&query={}".format(
                        o.login, login
                    )
                    phonebook_url = f"https://people.mozilla.org/a/ghe_{o.login}_users/"
                    msg = f"FOUND! {o.login} has {login} as a member: {url}"
                    msg += f"\n\tRemove from phonebook group if needed: {phonebook_url}"
                    owner_logins = get_org_owners(o)
                    is_owner = login in owner_logins
                    if is_owner:
                        msg += f"\n  NOTE: {login} is an OWNER of {org}"
                    any_perms.append(msg)
                if is_collaborator(o, login):
                    url = "https://github.com/orgs/{}/outside-collaborators?utf8=%E2%9C%93&query={}".format(
                        o.login, login
                    )
                    any_perms.append(
                        "FOUND! {} has {} as a collaborator: {}".format(
                            o.login, login, url
                        )
                    )
            else:
                end_msg_count = len(any_perms)
                if end_msg_count > start_msg_count:
                    # some found, put a header on it, the add blank line
                    any_perms.insert(
                        start_msg_count,
                        "\nFound {:d} orgs for {}:".format(
                            end_msg_count - start_msg_count, login
                        ),
                    )
                    any_perms.append("")
                else:
                    any_perms.append(f"No permissions found for {login}")
    return any_perms

In [None]:
def extract_addresses(text):
    """Get email addresses from text."""
    # ASSUME that text is a list of email addresses (possibly empty)
    if not text:
        return []
    #     print("before: %s" % text)
    text = text.replace("[", "").replace("]", "").replace("b'", "").replace("'", "")
    #     print("after: %s" % text)
    #     print(" split: %s" % text.split())
    return text.split()
    # raise ValueError("couldn't parse '{}'".format(text))

#### ACL Routines

##### Filtering Routines

Filtering is split into 3 cells:
1. a test to ensure it's not broken when changes are made
2. the exception lists, which are the most common update
3. the filter code

Any change to #2 or #3 will re-run the tests

In [None]:
# tests for every load of the filtering code
# NB these urls are NOT the search URL, they are the URL from the hit
test_url = [
    """https://github.com/mozilla-services/foxsec-results/blob/47f31f014cf21dc6e7e774ddc28e51a6f9eeba54/bucketlister/README.md""",
    """https://github.com/mozilla-services/product-delivery-tools/blob/47f31f014cf21dc6e7e774ddc28e51a6f9eeba54/bucketlister/README.markdown""",
    """https://github.com/mozilla-services/cloudops-docs/blob/0ff6ea92e394784aef55abd4b9f8b5d26306fe4b/TeamDiagrams/service_registry.csv""",
    """https://github.com/mozilla/participation-metrics-identities/blob/12b64498e12fe035cd5f7e081fa0e38888e0be8d/Community%20Analytics%20-%20Reps%20Organization%20-%20Test%201.csv""",
    # should skip - org
    """https://github.com/fxos/participation-metrics-identities/blob/12b64498e12fe035cd5f7e081fa0e38888e0be8d/Community%20Analytics%20-%20Reps%20Organization%20-%20Test%201.csv""",
    # should skip - regexp repo
    """https://github.com/mozilla-releng/take-home-assignment-no-such-repo/blob/12b64498e12fe035cd5f7e081fa0e38888e0be8d/Community%20Analytics%20-%20Reps%20Organization%20-%20Test%201.csv""",
    # should skip - below 'tests' or 'test' directory
    """https://github.com/mozilla/participation-metrics-identities/blob/12b64498e12fe035cd5f7e081fa0e38888e0be8d/tests/should-be-skipped.csv""",
    """https://github.com/Pocket/Android/blob/54abeefaa252a4b1c279f7c5bf571082f2283dc1/sync-pocket-android/src/test/resources/mock/getNotifications.json""", 
    """https://github.com/mozilla-services/addons-code-corpus/blob/5b5646ad2390ba1b22f64ca62953ec3b260c0da7/train/bad/872325/chat/index.html""",
    """https://github.com/mozilla-services/addons-code-corpus/blob/5b5646ad2390ba1b22f64ca62953ec3b260c0da7/training/bad/872325/chat/index.html""",
    """https://github.com/Pocket/iOS/blob/d69247be8b8987038b45406e168853d59e72e1ef/Listen/Listen/Sample/Definitions/PKTListenAppTheme.m""",
    """https://github.com/Pocket/particle/blob/8e5e3b45766fb3f93fbf28d8170f522aa8ba0e4f/convert/samples/input/html/audiemega2/1685443536.html""",
    # should skip based on filename
    """https://github.com/Pocket/particle/blob/8e5e3b45766fb3f93fbf28d8170f522aa8ba0e4f/convert/samples/input/html/audiemega2/Makefile""",
    """https://github.com/Pocket/particle/blob/8e5e3b45766fb3f93fbf28d8170f522aa8ba0e4f/convert/samples/input/html/audiemega2/pyproject.toml""",
    """https://github.com/mozilla-it/cloudalerts/blob/c3721d1d17f5e987cdc60f3d3d0c161a0b04b5ac/Dockerfile""",
    # should pass (makefile != Makefile)
    """https://github.com/mozilla-services/cloudops-docs/blob/0ff6ea92e394784aef55abd4b9f8b5d26306fe4b/TeamDiagrams/makefile""",
]
test_good = [
    None,  # should be skipped file extension
    None,  # should be skipped file extension
    ("""https://github.com/search?type=Code&ref=advsearch&q=repo%3Amozilla-services/cloudops-docs+path%3A"TeamDiagrams/service_registry.csv"+oremj""",
    "mozilla-services/cloudops-docs", "TeamDiagrams", "service_registry.csv"),
    ("""https://github.com/search?type=Code&ref=advsearch&q=repo%3Amozilla/participation-metrics-identities+path%3A"Community%20Analytics%20-%20Reps%20Organization%20-%20Test%201.csv"+oremj""",
    "mozilla/participation-metrics-identities", "", "Community%20Analytics%20-%20Reps%20Organization%20-%20Test%201.csv"),
    None,  # should be skipped org
    None,  # should be skipped matching repo regex
    None,  # skipped 'tests' in path
    None,  # skipped 'test' in path
    None,  # skipped 'train' in path
    None,  # skipped 'training' in path (matches substring)
    None,  # skipped 'sample' in path
    None,  # skipped 'samples' in path (matches substring)
    None,  # skipped: Makefile
    None,  # skipped: pyproject.toml
    None,  # skipped: Dockerfile
    ("""https://github.com/search?type=Code&ref=advsearch&q=repo%3Amozilla-services/cloudops-docs+path%3A"TeamDiagrams/makefile"+oremj""",
    "mozilla-services/cloudops-docs", "TeamDiagrams", "makefile"),
]
test_login = "oremj"
_test_ignore_filters_invocation_count = 0

_debug_test_case_num = 0  # leave at 0 for no debug, -1 to find which test case is failing

def _test_ignore_filters():
    global _test_ignore_filters_invocation_count
    _test_ignore_filters_invocation_count += 1
    if _test_ignore_filters_invocation_count == 1:
        # first invocation means both cells needed for test haven't been loaded yet
        return "skipped awaiting code load"
    test_case_num = 0
    for test, success in zip(test_url, test_good):
        test_case_num += 1
        do_debug = (_debug_test_case_num == test_case_num)
        if _debug_test_case_num:
            print(f"case {test_case_num}; debug {do_debug}")
        actual = search_hit_to_url(test, test_login, debug=do_debug)
        if actual != success:
            print(f"test case {test_case_num}: {test}")
            print(f" received: {actual}")
            print(f" expected: {success}")
            for a, e in zip(actual, success):
                print(f"{a==e  !r:5}: '{a}'\n       '{e}'")
            raise SystemExit("unit test failed")
    return "ignore filter tests pass"

In [None]:
# items to ignore -- all heuristically derived

# only add extensions or repos that could NEVER contain an ACL definition
filenames_to_skip = {"setup.py", "pyproject.toml", "requirements.txt", "Makefile", "Dockerfile"}
extensions_to_skip = ( ".ics", ".md", ".markdown", ".rst", ".der", ".pem", ".crt", ".html", ".htm", ".svg", ".bib", ".po")

# some orgs have patterns for repo names, take advantage of that
repos_to_skip_regexp = set((
    "mozilla-services/foxsec-results",
    "mozilla-services/cloudops-jenkins",
    "mozilla-services/cloudqa-jenkins",
    "mozilla/gecko-dev",
    "mozilla/eu2019-ad-transparency-report",
    "mdn/archived-content",
    "mozilla-releng/take-home-assignment.*",  # interview tests
    "mozilla-it/www-archive.mozilla.org",
    "mdn/retired.*content",  # history
    "mozilla-it/sumo-l10n.*",
    "mozmeao/sumo-l10n.*",
    "mozmeao/www-l10n",
    "mozilla-services/ms-language-packs",
    "mozilladatascience/search-terms-sanitization",
    "Pocket/AndroidHiring",
    "Pocket/Localization",
    "Pocket/data-explorations",
    "mdn/translated-content",
    "Pocket/parser-benchmark",
    "mozilla/releases_insights",
))
# convert to single regexp & compile
# based on https://stackoverflow.com/a/3040797/5128493
import re
RE_REPO_TO_SKIP = re.compile("(?:" + ")|(?:".join(repos_to_skip_regexp) + ")", re.IGNORECASE)

# These orgs are guaranteed not to have any current ACLs in them
# - could be expanded for any parked or archived org
orgs_to_skip = (
    "fxos",
    "fxos-eng",
    "mozilla-b2g",
    "moco-ghe-admin",
    "mozilla-l10n",  # only translations, no apps or services
    "common-voice",  # not supported by IT
)
# skip anything in a `tests` directory
# ToDo: consider making regexp, would allow path /data/ & /changelog.*/ to be added
paths_to_skip = (
    "test",
    "train",  # all the AI these days
    "sample",
    "locales",  # l10n stuff
    "translations",
    "template",
    "resources",
)

# test on any change
print(_test_ignore_filters())

In [None]:
# we get some insane counts sometimes, along with 404s as all of these
# results are based on an index GitHub created some time ago, and include sha1
# references. E.g. 404 link:
#    https://github.com/mozilla-services/foxsec-results/blob/70a5b7841edcdb967beddbce75309efa0bc2b687/aws-pytest/cloudservices-aws-stage/one-offs/cloudservices-aws-stage-2018-01-31-secgroup-service-report.md/search?q=oremj&type=code
# valid version
#    https://github.com/mozilla-services/foxsec-results/blob/master/aws-pytest/cloudservices-aws-stage/one-offs/cloudservices-aws-stage-2018-01-31-secgroup-service-report.md
# However, that URL won't support a search endpoint, so what we _really_ want is
#    https://github.com/search?q=oremj+repo%3Amozilla-services%2Ffoxsec-results++path%3Aaws-pytest%2Fcloudservices-aws-stage%2Fone-offs%2F+filename%3Acloudservices-aws-stage-2018-01-31-secgroup-service-report.md&type=Code&ref=advsearch
#    https://github.com/search?q=repo%3Amozilla-services/foxsec-results%20path%3Aaws-pytest/cloudservices-aws-stage/one-offs%20filename%3Acloudservices-aws-stage-2018-01-31-secgroup-service-report.md%20fred&type=code&ref=advsearch
#
# rebuild the file hit url into what we want

from urllib.parse import urlparse, urlunparse, quote_plus
#        from IPython.core.debugger import set_trace; set_trace()


def ignore_path(url_list, debug=False):
    for ignorable in paths_to_skip:
        # we want to do partial matching, so can't use list comprehension
        for element in [x.lower() for x in url_list[1:-1]]:
            if ignorable in element:
                if debug:
                    print(f"Ignoring {'/'.join(url_list)}")
                return True
    return False

def search_hit_to_url(url, login=None, debug=False):
    # split into components
    parts = urlparse(url)
    # break down the path
    path_parts = parts.path.split('/')
    if path_parts[1] in orgs_to_skip:
        # ignore orgs with no relevant ACL
        if debug:
            print(f"ignoring based on org '{path_parts[1]}'")
        return
    repo = '/'.join(path_parts[1:3])
#     if repo in repos_to_skip:
    if RE_REPO_TO_SKIP.match(repo):
        if debug:
            print(f"ignoring based on repo '{repo}'")
            print(f"pattern: r'{RE_REPO_TO_SKIP.pattern}'")
        return
    if ignore_path(path_parts, debug=debug):
        if debug:
            print(f"ignoring based on path: {'/'.join(path_parts)}")
        return
    filename = path_parts[-1]
#     from pprint import pprint
#     pprint(path_parts)
#     print(f"{repo}; {filename}")
    try:
#         from IPython.core.debugger import Pdb; Pdb().set_trace()
        if filename[filename.rindex('.'):] in extensions_to_skip:
            if debug:
                print(f"ignoring due to extension '{filename[filename.rindex('.'):]}'")
            return
    except ValueError:
        # file didn't have extension, so process it
        pass
    if filename in filenames_to_skip:
        if debug:
            print(f"ignoring due to filename '{filename}'")
        return
    basepath = path_parts[3:-1]
    if basepath[0] == "blob":
        # get rid of 'blob' and sha1
        basepath = basepath[2:]
        
    # build the new query string
    # With the newer syntax, everything can be in one "path" filter
    basepath = '/'.join(basepath)
    filename_filter = 'path:"'
    if basepath:
        filename_filter += f"{basepath}/"
    filename_filter += f'{filename}"'
    # the actual query needs to be form encoded, so use quote_plus. 
    # We make unusual characters safe to get the correct result when working with file with embedded spaces.
    # Note - won't currently work with paths with spaces
    query = quote_plus(f"""repo:{repo} {filename_filter} {login}""", safe='/%"')
    query_string = f"type=Code&ref=advsearch&q={query}"
        
    # now rebuild the url
    new_url = urlunparse((
        parts.scheme,
        parts.netloc,
        "search",
        None, # params
        query_string,
        None, # fragment
    ))
    return new_url, repo, basepath, filename

# test on any change
print(_test_ignore_filters())

##### ACL reporting code

In [None]:
import csv, io
def check_for_acls(logins):

    """ Check for these items in code, could be an acl to be removed
    
    Note that we haven't pruned logins to just the orgs we found hits on -- we're using all GitHub logins. May want to modify in the future.
    """
    possibles = set(logins)

    # we're now outputing in CSV format, so put in a header line
    csvfile = io.StringIO()
    writer = csv.writer(csvfile)
    writer.writerow(["Action Taken", "Comment", "", "File", "Search URL"])
    # add formula to use for copy down in R2C3 - still requires manual intervention
    #  1. in cell C3 select, edit, and enter to make real formula
    #  2. fill down for all rows in sheet
    writer.writerow(["", "", '=if(ISBLANK(E2),"", HYPERLINK(E2,"?"))', "", ""])
    writer.writerow([""] * 4)
    writer.writerow([f"Checking for possible ACLs for: {', '.join(possibles)}", "", "",])
    writer.writerow([""] * 4)
#     import pdb ; pdb.set_trace()
#     from IPython.core.debugger import set_trace; set_trace()


    # needed for exception handling below
    import http

    for org in orgs_to_check:
#         print(f" {org}..", end='')
        for l in possibles:
            full_list = []
            try:
                full_list = list(gh.search_code(query=f"org:{org} {l}"))
            except Exception as e:
                if isinstance(e, http.client.RemoteDisconnected):
                    # This is "fun" to run into - doesn't happen very often
                    # so this recovery is an educated guess (the time I
                    # did see it, it was after a 'resumed' message from
                    # the clause below)
                    for i in range(3):
                        try_login()
                        if gh:
                            # re-established connection
                            print(f"re-established connection on try {i+1}")
                            break
                        else:
                            time.sleep(60)
                    else:
                        print(f"failed to re-establish connection after {i+1} tries")
                        raise SystemExit
                elif e.code not in [403, 422]:
                    print(f"org={org} l={l} exception={str(e)}")
                elif e.code in [403]:
                    print("\n\nOut of API calls, waiting a minute ..", end='')
                    print_limits(verbose=False)
                    # we can hit this a lot, so just wait a minute
                    time.sleep(60)
                    print("... resumed.")
                # we've reported on everything of interest, no need for else clause
#                 else:
#                     print(f"Got code {e.code} for org {org}, search {l}")
            # remove vulnerability repos (*-ghsa-*) and archived repos (archive status 
            # requires refresh of repository object
            hit_list = [r for r in full_list if (not "-ghsa-" in r.repository.name)
                                            and (not r.repository.refresh().archived)]
            num_search_results = len(hit_list)

            search_urls = []
            for search_hit in hit_list:
                new_url = search_hit_to_url(search_hit.html_url, l)
#                 print(f"before: {search_hit.html_url}\n after: {new_url}")
                if new_url:
                    search_urls.append(new_url)
            num_raw_search_urls = len(search_urls)
            search_urls = set(search_urls)
            num_search_urls = len(search_urls)
#             print(f"search results: {num_search_results}; after translation: {num_raw_search_urls}; after dedupe: {num_search_urls}")
            if num_search_urls > 0:
                writer.writerow(['', f"{num_search_urls} files with possible ACLs in {org} for {l}:", "", ""])
                for url, repo, path, filename in sorted(search_urls):
                    # output in csv format
                    writer.writerow(["", "", "", f"{repo}/{path}/{filename}", f"{url}"])
            # import pdb ; pdb.set_trace()
    csvfile.seek(0)
    hits = [l.strip() for l in csvfile.readlines()]
    return hits

In [None]:
def check_github_acls(logins):
    logins_to_check = set(logins.split())
    # import pdb; pdb.set_trace()
    msgs = check_for_acls(logins_to_check)
    print("=" * 35)
    display(*msgs)

#### Heroku support

In [None]:
def check_heroku_logins(logins):
    logins_to_check = set(logins.split())
    # import pdb; pdb.set_trace()
    for login in logins_to_check:
        print("\nworking on %s:" % login)
        show_heroku_data(login)

In [None]:
# let user start manual work before we do all the GitHub calls
def show_heroku_data(primary_email:str) -> None:
    display("Check these URLs for Heroku activity:")
    display(
        "  Heroku Access: https://people.mozilla.org/a/heroku-members/edit?section=members"
    )
    display(f"     copy/paste for ^^ query:  :{primary_email}:  ")
    display(
        "  People: https://people.mozilla.org/s?who=all&query={}".format(
            primary_email.replace("@", "%40")
        )
    )
    display(
        "  Heroku: https://dashboard.heroku.com/teams/mozillacorporation/access?filter={}".format(
            primary_email.replace("@", "%40")
        )
    )

#### main driver

In [None]:
import re
import os

re_flags = re.MULTILINE | re.IGNORECASE


def process_from_email(email_body):
    # get rid of white space
    email_body = os.linesep.join(
        [s.strip() for s in email_body.splitlines() if s.strip()]
    )
    if not email_body:
        return

    user = set()

    # Extract data from internal email format
    match = re.search(r"^Full Name: (?P<full_name>\S.*)$", email_body, re_flags)
    if match:
        # add base and some variations
        full_name = match.group("full_name")
        user.add(full_name)
        # remove spaces, forward & reversed
        user.add(full_name.replace(" ", ""))
        user.add("".join(full_name.split()[::-1]))
        # use hypens, forward & reversed
        user.add(full_name.replace(" ", "-"))
        user.add("-".join(full_name.split()[::-1]))

    match = re.search(r"^Email: (?P<primary_email>.*)$", email_body, re_flags)
    primary_email = match.group("primary_email") if match else None
    user.add(primary_email)
    default_login = primary_email.split("@")[0].lower() if primary_email else None
    if default_login:
        # add some common variations that may get discarded for "too many" matches
        user.update(
            [
                f"moz{default_login}",
                f"moz-{default_login}",
                f"mozilla{default_login}",
                f"mozilla-{default_login}",
                f"{default_login}moz",
                f"{default_login}-moz",
            ]
        )

    if not defer_heroku_lookup:
        show_heroku_data(str(primary_email))
    display(email_body)

    match = re.search(r"^Github Profile: (?P<github_profile>.*)$", email_body, re_flags)
    declared_github = match.group("github_profile") if match else None
    user.add(declared_github)
    display(f"Declared GitHub {declared_github}")

    # check CIS for verified login (not all users will have creds)
    verified_github_login, first_name = check_CIS(primary_email)
    if verified_github_login:
        user.add(verified_github_login)
        display(f"Verified GitHub {verified_github_login}")

    match = re.search(r"^Zimbra Alias: (?P<other_email>.*)$", email_body, re_flags)
    possible_aliases = extract_addresses(match.group("other_email") if match else None)
    user.update(possible_aliases)

    # new field: Email Alias -- list syntax (brackets)
    match = re.search(r"^Email Alias: \s*\[(?P<alias_email>.*)\]", email_body, re_flags)
    user.add(match.group("alias_email") if match else None)

    # we consider each token in the IM line as a possible GitHub login
    match = re.search(r"^IM:\s*(.*)$", email_body, re_flags)
    if match:
        im_line = match.groups()[0]
        matches = re.finditer(r"\W*((\w+)(?:\s+\w+)*)", im_line)
        user.update([x.group(1) for x in matches] if matches else None)

    match = re.search(r"^Bugzilla Email: (?P<bz_email>.*)$", email_body, re_flags)
    user.add(match.group("bz_email") if match else None)

    # grab the department name, for a heuristic on whether we expect to find perms
    expect_github_login = False
    match = re.search(r"^\s*Dept Name: (?P<dept_name>\S.*)$", email_body, re_flags)
    if match and not verified_github_login:
        department_name = match.groups()[0].lower()
        dept_keys_infering_github = ["firefox", "engineering", "qa", "operations"]
        for key in dept_keys_infering_github:
            if key in department_name:
                expect_github_login = True
                break

    # clean up some noise, case insensitively, "binary" markers
    user = {x.lower() for x in user if x and (len(x) > 2)}
    to_update = [x[2:-1] for x in user if (x.startswith("b'") and x.endswith("'"))]
    user.update(to_update)
    user = {x for x in user if not (x.startswith("b'") and x.endswith("'"))}

    # the tokens to ignore are added based on discovery,
    # they tend to cause the searches to get rate limited.
    user = user - {
        None,
        "irc",
        "slack",
        "skype",
        "b",
        "hotmail",
        "mozilla",
        "ro",
        "com",
        "softvision",
        "mail",
        "twitter",
        "blog",
        "https",
        "jabber",
        "net",
        "github",
        "gmail",
        "facebook",
        "guy",
        "pdx",
        "yahoo",
        "aim",
        "whatsapp",
        "gtalk",
        "google",
        "gpg",
        "telegram",
        "keybase",
        "zoom",
        "name",
    }
    global displayed_users
    displayed_users = set()
    try:
        headers = [
            "Search seeds: '{}'".format("', '".join(user)),
        ]
        display(*headers)
        guesses = set()
        for term in user:
            possibles = gather_possibles(term)
            guesses.update({x.lower() for x in possibles})
        # include declared_github if it exists
        if declared_github:
            guesses.add(declared_github.lower())
        guesses.update({x.login.lower() for x in displayed_users})
        display(f"Checking logins {guesses}")
        msgs = []
        msgs = check_login_perms(guesses, headers)
        found_perms = "FOUND!" in "".join(msgs)
        display(f"msgs {len(msgs)}; headers {len(headers)}")
        display(
            "found_perms {}; declared_github {} {}".format(
                found_perms, declared_github, bool(declared_github)
            )
        )

        if declared_github and not found_perms:
            msgs.append(f"Even for declared login '{declared_github}'.")
        if expect_github_login and not found_perms:
            msgs.append(
                "WARNING: expected GitHub permissions for dept '{}'".format(
                    department_name
                )
            )
        
        # check for GitHub login or ldap in a file (might be permissions)
        guesses.add(default_login)
        print(f"before: guesses {guesses}; default {default_login}")
        if first_name and not primary_email.endswith("mozilla.com"):
            # for non-MoCo emails, the the primary email stem might be their first name
            # which leads to an insane amount of false positives, so remove it
            # the assumption is that they would not be in any ACL files, anyway
            guesses -= {first_name.lower()}
        print(f" after: guesses {guesses}; default {default_login}")
        if defer_acl_lookup:
            display("Defering search for possible ACLs")
            msgs.append(f"Check for ACLs using the following values: {' '.join([str(x) for x in guesses])}")
        else:
            display("Looking for possible ACLs")
            new_msgs = check_for_acls(guesses)
            msgs.extend(new_msgs)
        msgs.append("Finished all reporting.")
        display(*msgs)
    except github3.exceptions.ForbiddenError as e:
        print_limits(e)
        raise e

In [None]:
from ipywidgets import interact_manual, Layout, widgets
from IPython.display import display

text = widgets.Textarea(
    value="email: \nim: ",
    placeholder="Paste ticket description here!",
    description="Email body:",
    layout=Layout(width="95%"),
    disabled=False,
)

run_process = interact_manual.options(manual_name="Process")

In [None]:
print("before def of display")
def display(*args):
    # iPyWidgets don't like unicode - ensure everything we try to put there is ascii
    text = "\n".join(
        [str(x) for x in args]
    )  # deal with None values by casting to unicode
    # python 3 no longer requires us to play the convert-to-ascii game
    cleaned = text  #.encode("ascii", "replace")
    if cleaned.strip():
        print(str(cleaned))
print("after def of display")

In [None]:
def check_github_logins(logins):
    logins_to_check = set(logins.split())
    # import pdb; pdb.set_trace()
    for login in logins_to_check:
        print("\nworking on %s:" % login)
        msgs = check_login_perms([login])
        display(*msgs)

#### Cache Tuning & Clearing

Various functions use lru_cache -- this outputs the values to see if they are tuned appropriately.

Note that these have no meaning until after 1 or more queries have been run.

In [None]:
print("get_users")
print(get_users.cache_info())
print("_search_for_org")
print(_search_for_org.cache_info())

print("get_collaborators")
print(get_collaborators.cache_info())
print("get_members")
print(get_members.cache_info())

print("get_org_owners")
print(get_org_owners.cache_info())
print("get_inspectable_org_object")
print(get_inspectable_org_object.cache_info())

In [None]:
print("clearing caches...")
get_users.cache_clear()
_search_for_org.cache_clear()
get_collaborators.cache_clear()
get_members.cache_clear()
get_org_owners.cache_clear()
get_inspectable_org_object.cache_clear()

#### EML file support

In [None]:
# read EML file support
import email
from ipywidgets import FileUpload
from pprint import pprint as pp
from IPython.display import display as display_widget

In [None]:


def extract_reply(body):
    extracted = []
    for l in body.split("\r\n"):
        if l.startswith("> --"):
            break
        elif l.startswith("> "):
            extracted.append(l[2:])
    return extracted


def process_from_file(uploader):
    # message = email.message_from_string()
    for file in list(uploader.value.keys()):
        print("checking %s" % file)
        pp(list(uploader.value[file].keys()))
        content = uploader.value[file]["content"]
        pp(type(content))
        pp(type(uploader.value[file]))
        #pp(uploader.value[file])
        message = email.message_from_bytes(content)
        #message = email.message_from_string(uploader.value[file]["content"])
        for part in message.walk():
            if part.get_content_maintype() == "multipart":
                continue
            else:
                mime = part.get_content_type()
                if "plain" in mime:
                    body = part.get_payload()
                    # this could be the original, or a reply
                    if re.search(r"""^Full Name:""", body, re_flags):
                        print("original email:")
                        process_from_email(body)
                    elif re.search(r"""^> Full Name:""", body, re_flags):
                        print("reply:")
                        process_from_email("\n".join(extract_reply(body)))
                    else:
                        print("no match!\n%s" % body)

## End of initialization

In [None]:
code_initialization_completed = True

# Start of common usage (How To)

Currently, there are three common use cases:
- processing an offboarding email (via downloaded EML file),
- processing an offboarding email (via message copy/paste), and
- adhoc lookup of GitHub login

For anything else, you're on your own!

All usage requires the following setup:
1. Supply your PAT token via the environment variable `GITHUB_PAT` when starting the notebook server. (If you can't do that, read the code for another way.)
2. Supply your CIS credentials via the environment variables `CIS_CLIENT_ID` and `CIS_CLIENT_SECRET`


## EML File parsing

Upload the file using the button below, then process that file by running the cell below the button. You can only process one file at a time, but the "file uploaded" count will continue to increase (ui glitch).

In [None]:
_uploader = FileUpload(accept="*.eml", multiple=False)
display_widget(_uploader)
# check_file(_uploader)

In [None]:
def check_file(f):
    try:
        # display_widget(_uploader)
        process_from_file(f)
        print("completed")
    except Exception as e:
        print(repr(e))
        raise


check_file(_uploader)

## Process offboarding email body text (copy/paste)

Usage steps - for each user:
    1. Run the cell below -- it should display a text entry area and a "Process" button.
    2. Copy entire text of email
    3. Paste into the text area below
    4. Click the "Process" button
    5. Use the generated links to check for Heroku authorization
    6. After "process finished" printed, copy/paste final output into email

In [None]:
@run_process(t=text)
def show_matches(t):
    try:
        process_from_email(t)
    except Exception as e:
        print(repr(e))
        pass

## Adhoc Lookups (GitHub & Heroku)

Fill in list of the desired logins in the cell below

In [None]:
check_github_logins(
    """ 
 """
)
print("done")

In [None]:
check_heroku_logins(
    """ 
 """
)
print("done")

## ACL search

Fill in list of the desired logins in the cell below. Appropriate values may be in the GitHub report.

In [None]:
print()
check_github_acls(
    """ 

"""
)
print("done")

# To Do

- check invites as well, using manage_invitations.py
- code doesn't handle hyphenated github logins, e.g. 'marco-c' (gets split)
- github lookup should strip https... so can use link from people.m.o
- dpreston, aka fzzy, doesn't have any GitHub perms
- fix permutations of names
- preprocess to remove all (colon separated) :b':':[:]: (maybe not the :b: & :':)
- add link to Heroku service accounts to check
- hide connection failure tracebacks during check_github_tracebacks

<details><summary>Completed <small><em>(click to toggle)</em></small></summary>

- ~~GitHub login no longer part of email, but user id is available via CIS~~
- ~~add "clear cache" button to purge after long idle~~ _(in tuning section)_
- ~~add common login with 'moz{,illa}' tacked on, sometimes with a dash~~
- ~~update link to view access group on people.m.o~~
- ~~add "trying" info to copy/paste output~~
- ~~double check that "even for declared login" code still active~~
- ~~add formatted output summary for copy/paste~~
- ~~when a guess is multiple words, each word should be tried separately as well~~
- ~~code should always search for stated github, even if search is "too many" (e.g. "past")~~
- ~~does not call out owner status (reports as member)~~
- ~~add short ldap name as an "always check"~~
- ~~always check stem when search gives too many (i.e. go for the exact match)~~
- ~~treat Zimbra Aliases as a potential multi valued list (or empty)~~
- ~~"-" is a valid character in GitHub logins. Try as separator first-last and last-first~~
</details>