![ga4](https://www.google-analytics.com/collect?v=2&tid=G-6VDTYWLKX6&cid=1&en=page_view&sid=1&dl=statmike%2Fvertex-ai-mlops%2Farchitectures%2Ftracking%2Fsetup%2Fgithub&dt=GitHub+Metrics+-+3+-+Traffic+-+Incremental+Update+Cloud+Function.ipynb)

# GitHub Metrics: Traffic History Cloud Function For Incremental Updates

The notebooks for traffic history in steps 1 and 2 created and initial setup of tables in the BigQuery datasets `github_metrics` and `reporting`.  Tables are named the same within both.  The logic for incrementally updating these is also tested and developed in those notebooks.

**Tables**
- `traffic_clones`
- `traffic_popular_paths`
- `traffic_popular_referrers`
- `traffic_views`
- `stargazers`
- `forks`
- `subscribers`


This notebook creates a Cloud Function to run the code that incrementally updates the tables in the datasets and schedules it to run daily using Pub/Sub and Cloud Scheduler.


**Notes**
- The Pub/Sub topic is shared between all step 3 notebooks
- The Cloud Scheduler is shared between all step 3 notebooks

---
## Colab Setup

To run this notebook in Colab click [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/statmike/vertex-ai-mlops/blob/main/architectures/tracking/setup/github/GitHub%20Metrics%20-%203%20-%20Traffic%20-%20Incremental%20Update%20Cloud%20Function.ipynb) and run the cells in this section.  Otherwise, skip this section.

This cell will authenticate to GCP (follow prompts in the popup).

In [1]:
PROJECT_ID = 'vertex-ai-mlops-369716' # replace with project ID

In [2]:
try:
    import google.colab
    from google.colab import auth
    auth.authenticate_user()
    !gcloud config set project {PROJECT_ID}
except Exception:
    pass

Updated property [core/project].


---
## Package Installs (if needed)

This notebook uses the Python Clients for
- Google Service Usage
    - to enable APIs
- Cloud Pub/Sub
- Cloud Functions
- Cloud Scheduler

The cells below check to see if the required Python libraries are installed.  If any are not it will print a message to do the install with the associated pip command to use.  These installs must be completed before continuing this notebook.

In [3]:
try:
    import google.cloud.service_usage_v1
except ImportError:
    print('You need to pip install google-cloud-service-usage')
    !pip install google-cloud-service-usage -q

try:
    import google.cloud.pubsub
except ImportError:
    print('You need to pip install google-cloud-pubsub')
    !pip install google-cloud-pubsub -q

try:
    import google.cloud.functions
except ImportError:
    print('You need to pip install google-cloud-functions')
    !pip install google-cloud-functions -q

try:
    import google.cloud.scheduler
except ImportError:
    print('You need to pip install google-cloud-scheduler')
    !pip install google-cloud-scheduler -q

You need to pip install google-cloud-service-usage
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 KB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hYou need to pip install google-cloud-pubsub
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.0/243.0 KB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hYou need to pip install google-cloud-functions
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.6/125.6 KB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hYou need to pip install google-cloud-scheduler
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.5/98.5 KB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h

---
## Setup

In [4]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'vertex-ai-mlops-369716'

In [5]:
REGION = 'us-central1'

github_user = 'statmike'
github_repo = 'vertex-ai-mlops'

BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'github_metrics'

In [6]:
import requests
import json
import time
from datetime import datetime
import pandas as pd
from io import StringIO
import os, shutil

from google.cloud import bigquery
from google.cloud import storage

from google.cloud import service_usage_v1
from google.cloud import pubsub_v1
from google.cloud import functions_v1
from google.cloud import scheduler_v1

In [7]:
bq = bigquery.Client(project = PROJECT_ID)
gcs = storage.Client(project = PROJECT_ID)

su_client = service_usage_v1.ServiceUsageClient()
pubsub_pubclient = pubsub_v1.PublisherClient() 
functions_client = functions_v1.CloudFunctionsServiceClient()
scheduler_client = scheduler_v1.CloudSchedulerClient()

In [8]:
DIR = 'temp'
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Enable APIs

Using Cloud Functions, Cloud Pub/Sub and Cloud Scheduler requires enabling these APIs for the Google Cloud Project.  Additionally, Cloud Functions uses Cloud Build which also need to be enabled.

Options for enabeling these.  In this notebook option 2 is used.
 1. Use the APIs & Services page in the console: https://console.cloud.google.com/apis
     - `+ Enable APIs and Services`
     - Search for Cloud Build and Enable
     - Search for Artifact Registry and Enable
 2. Use [Google Service Usage](https://cloud.google.com/service-usage/docs) API from Python
     - [Python Client For Service Usage](https://github.com/googleapis/python-service-usage)
     - [Python Client Library Documentation](https://cloud.google.com/python/docs/reference/serviceusage/latest)
     
The following code cells use the Service Usage Client to:
- get the state of the service
- if 'DISABLED':
    - Try enabling the service and return the state after trying
- if 'ENABLED' print the state for confirmation

### IAM
The API may be needed for creating a service account to run the Cloud Function

In [9]:
iam = su_client.get_service(
    request = service_usage_v1.GetServiceRequest(
        name = f'projects/{PROJECT_ID}/services/iam.googleapis.com'
    )
).state.name


if iam == 'DISABLED':
    print(f'IAM is currently {iam} for project: {PROJECT_ID}')
    print(f'Trying to Enable...')
    operation = su_client.enable_service(
        request = service_usage_v1.EnableServiceRequest(
            name = f'projects/{PROJECT_ID}/services/iam.googleapis.com'
        )
    )
    response = operation.result()
    if response.service.state.name == 'ENABLED':
        print(f'IAM is now enabled for project: {PROJECT_ID}')
    else:
        print(response)
else:
    print(f'IAM already enabled for project: {PROJECT_ID}')

IAM already enabled for project: vertex-ai-mlops-369716


### Cloud Pub/Sub

In [10]:
pubsub = su_client.get_service(
    request = service_usage_v1.GetServiceRequest(
        name = f'projects/{PROJECT_ID}/services/pubsub.googleapis.com'
    )
).state.name


if pubsub == 'DISABLED':
    print(f'Cloud Pub/Sub is currently {pubsub} for project: {PROJECT_ID}')
    print(f'Trying to Enable...')
    operation = su_client.enable_service(
        request = service_usage_v1.EnableServiceRequest(
            name = f'projects/{PROJECT_ID}/services/pubsub.googleapis.com'
        )
    )
    response = operation.result()
    if response.service.state.name == 'ENABLED':
        print(f'Cloud Pub/Sub is now enabled for project: {PROJECT_ID}')
    else:
        print(response)
else:
    print(f'Cloud Pub/Sub already enabled for project: {PROJECT_ID}')

Cloud Pub/Sub already enabled for project: vertex-ai-mlops-369716


### Cloud Functions

In [11]:
cloudfunctions = su_client.get_service(
    request = service_usage_v1.GetServiceRequest(
        name = f'projects/{PROJECT_ID}/services/cloudfunctions.googleapis.com'
    )
).state.name


if cloudfunctions == 'DISABLED':
    print(f'Cloud Functions is currently {cloudfunctions} for project: {PROJECT_ID}')
    print(f'Trying to Enable...')
    operation = su_client.enable_service(
        request = service_usage_v1.EnableServiceRequest(
            name = f'projects/{PROJECT_ID}/services/cloudfunctions.googleapis.com'
        )
    )
    response = operation.result()
    if response.service.state.name == 'ENABLED':
        print(f'Cloud Functions is now enabled for project: {PROJECT_ID}')
    else:
        print(response)
else:
    print(f'Cloud Functions already enabled for project: {PROJECT_ID}')

Cloud Functions already enabled for project: vertex-ai-mlops-369716


### Cloud Scheduler

In [12]:
cloudscheduler = su_client.get_service(
    request = service_usage_v1.GetServiceRequest(
        name = f'projects/{PROJECT_ID}/services/cloudscheduler.googleapis.com'
    )
).state.name


if cloudscheduler == 'DISABLED':
    print(f'Cloud Scheduler is currently {cloudscheduler} for project: {PROJECT_ID}')
    print(f'Trying to Enable...')
    operation = su_client.enable_service(
        request = service_usage_v1.EnableServiceRequest(
            name = f'projects/{PROJECT_ID}/services/cloudscheduler.googleapis.com'
        )
    )
    response = operation.result()
    if response.service.state.name == 'ENABLED':
        print(f'Cloud Scheduler is now enabled for project: {PROJECT_ID}')
    else:
        print(response)
else:
    print(f'Cloud Scheduler already enabled for project: {PROJECT_ID}')

Cloud Scheduler already enabled for project: vertex-ai-mlops-369716


### Cloud Build 
Used By Cloud Functions

In [13]:
cloudbuild = su_client.get_service(
    request = service_usage_v1.GetServiceRequest(
        name = f'projects/{PROJECT_ID}/services/cloudbuild.googleapis.com'
    )
).state.name


if cloudbuild == 'DISABLED':
    print(f'Cloud Build is currently {cloudbuild} for project: {PROJECT_ID}')
    print(f'Trying to Enable...')
    operation = su_client.enable_service(
        request = service_usage_v1.EnableServiceRequest(
            name = f'projects/{PROJECT_ID}/services/cloudbuild.googleapis.com'
        )
    )
    response = operation.result()
    if response.service.state.name == 'ENABLED':
        print(f'Cloud Build is now enabled for project: {PROJECT_ID}')
    else:
        print(response)
else:
    print(f'Cloud Build already enabled for project: {PROJECT_ID}')

Cloud Build already enabled for project: vertex-ai-mlops-369716


---
## Pub/Sub
Use a Pub/Sub topic to trigger a Cloud Function to run.  The topic will be able to receive message manually or on a schedule from Cloud Scheduler.

The main concepts:
- Topic - a feed of messages
     - Publish - send a new message to a topic
     - Subscription - receive messages that arrive on topic
          - Push - the subscriber has new messages pushed to it
          - Pull - the subscriber request new messages by pulling them
          
In this example, a topic will be set up for daily runs of metric functions.  Publishing a new message to this topic will trigger one or more Cloud Functions to run like the one setup below.  The Cloud Funtion will have a push subscription to the topic.

In [14]:
PUBSUB_TOPIC = 'daily_metrics_triggers'

In [15]:
topic = ''
for topic in pubsub_pubclient.list_topics(project = f'projects/{PROJECT_ID}'):
    if topic.name.endswith(PUBSUB_TOPIC):
        break
    else: topic = ''

In [16]:
if topic:
    print(topic)
else:
    topic = pubsub_pubclient.create_topic(
        name = pubsub_pubclient.topic_path(PROJECT_ID, PUBSUB_TOPIC)
    )
    print(topic)

name: "projects/vertex-ai-mlops-369716/topics/daily_metrics_triggers"



In [17]:
print(f'Review The Pub/Sub Topic In The Console:\nhttps://console.cloud.google.com/cloudpubsub/topic/list?project={PROJECT_ID}')

Review The Pub/Sub Topic In The Console:
https://console.cloud.google.com/cloudpubsub/topic/list?project=vertex-ai-mlops-369716


---
## Cloud Function

Create a Cloud Funtion that runs the incremental update code for the tables in the dataset `github_metrics`.  The method below creates code files and zips them for storage on Cloud Storage as a source to the Cloud Function.

### Create Files

In [67]:
if os.path.exists(f'{DIR}/function'): shutil.rmtree(f'{DIR}/function')
os.makedirs(f'{DIR}/function')

In [68]:
%%writefile {DIR}/function/requirements.txt
pandas
db-dtypes
google-cloud-bigquery

Writing temp/function/requirements.txt


In [69]:
%%writefile {DIR}/function/main.py

# packages
import base64
import requests
import json
from datetime import datetime
import pandas as pd
import numpy as np
import os
import urllib
from google.cloud import bigquery


# clients
bq = bigquery.Client()


# parameters
github_user = 'statmike'
github_repo = 'vertex-ai-mlops'
BQ_DATASET = 'github_metrics'
github_api_url = f'https://api.github.com/repos/{github_user}/{github_repo}'
pat = os.getenv('GITHUB_PAT')


# helper function
def metric_get(metric_type, query_parameters = ''):
  response = requests.get(f'{github_api_url}/{metric_type}{query_parameters}', headers = {'Authorization': f'Bearer {pat}', 'Accept': 'application/vnd.github+json'})
  while response.status_code == 202:
      time.sleep(10)
      response = requests.get(f'{github_api_url}/{metric_type}{query_parameters}', headers = {'Authorization': f'Bearer {pat}', 'Accept': 'application/vnd.github+json'})
  return response


# the function
def collect(event, context):

    # print inputs to Cloud Function
    function_inputs = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    print(function_inputs)
    PROJECT_ID = function_inputs['PROJECT_ID']
    BQ_PROJECT = PROJECT_ID

    # START: Content from notebook: GitHub Metrics - 1 - Traffic

    # traffic_clones
    query = f"""
    SELECT t.*
    FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_clones` t
    WHERE 1=1 QUALIFY row_number() OVER(ORDER BY timestamp DESC) = 1
    """
    prior_traffic_clones = bq.query(query = query).to_dataframe()
    metric_type = 'traffic/clones'
    response = metric_get(metric_type)
    new_traffic_clones = pd.DataFrame(json.loads(response.text)['clones'])
    if new_traffic_clones['timestamp'].iloc[-1] != datetime.now().strftime("%Y-%m-%dT00:00:00Z"):
      # gap, likely due to no clones on a day, insert today with uniques clones
      new_traffic_clones = new_traffic_clones.append({'timestamp': datetime.now().strftime("%Y-%m-%dT00:00:00Z"), 'count': 0, 'uniques': 0}, ignore_index = True).sort_values(by = ['timestamp'])
    new_traffic_clones['uniques_last14days'] = np.nan
    new_traffic_clones['uniques_last14days'].iloc[-1] = json.loads(response.text)['uniques']
    new_traffic_clones['repo'] = github_user + '/' + github_repo
    overlap_record = new_traffic_clones[new_traffic_clones['timestamp'] == prior_traffic_clones['timestamp'].iloc[0]]
    new_records = new_traffic_clones[new_traffic_clones['timestamp'] > prior_traffic_clones['timestamp'].iloc[0]]
    if overlap_record.shape[0] == 1:
      if overlap_record[['timestamp', 'count', 'uniques']].values.tolist() != prior_traffic_clones[['timestamp', 'count', 'uniques']].values.tolist():
        updated_record = overlap_record
        updated_record['uniques_last14days'].iloc[0] = prior_traffic_clones['uniques_last14days'].iloc[0] 
        new_records = pd.concat([updated_record, new_records], ignore_index = True, axis = 0)
        job = bq.query(query = f"DELETE FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_clones` WHERE timestamp = '{updated_record['timestamp'].iloc[0]}'")
        job.result()
    if new_records.shape[0] >=1:
      append_job = bq.load_table_from_dataframe(
            dataframe = new_records,
            destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.traffic_clones"),
            job_config = bigquery.LoadJobConfig(
                write_disposition = 'WRITE_APPEND',
                autodetect = True, # detect schema
            ) 
      )
      append_job.result()

    # traffic_popular_paths
    metric_type = 'traffic/popular/paths'
    response = metric_get(metric_type)
    traffic_popular_paths = pd.DataFrame(json.loads(response.text))
    def parse_path(p):
        p = urllib.parse.unquote(p).replace('blob/main/', '')
        p = urllib.parse.unquote(p).replace('tree/main/', '')
        if p.rfind('.') == -1 or (p.rfind('.') < p.rfind('/')):
            p += '/readme.md'
        return p
    traffic_popular_paths['file'] = traffic_popular_paths.apply(lambda x: parse_path(x['path']), axis = 1)
    traffic_popular_paths = traffic_popular_paths.drop(['title', 'path'], axis = 1)
    traffic_popular_paths['timestamp'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z") #strftime("%Y-%m-%dT%H:%M:%SZ") 
    traffic_popular_paths['repo'] = github_user + '/' + github_repo
    query = f"""
    SELECT *
    FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_popular_paths`
    WHERE timestamp = '{traffic_popular_paths['timestamp'].max()}'
    ORDER BY count DESC
    """
    prior = bq.query(query = query).to_dataframe()
    if prior.shape[0] == 0:
      append_job = bq.load_table_from_dataframe(
              dataframe = traffic_popular_paths,
              destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.traffic_popular_paths"),
              job_config = bigquery.LoadJobConfig(
                  write_disposition = 'WRITE_APPEND',
                  autodetect = True, # detect schema
              ) 
        )
      append_job.result()

    # traffic_popular_referrers
    metric_type = 'traffic/popular/referrers'
    response = metric_get(metric_type)
    traffic_popular_referrers = pd.DataFrame(json.loads(response.text))
    traffic_popular_referrers['timestamp'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z") #strftime("%Y-%m-%dT%H:%M:%SZ")
    traffic_popular_referrers['repo'] = github_user + '/' + github_repo
    query = f"""
    SELECT *
    FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_popular_referrers`
    WHERE timestamp = '{traffic_popular_paths['timestamp'].max()}'
    ORDER BY count
    """
    prior = bq.query(query = query).to_dataframe()
    if prior.shape[0] == 0:
      append_job = bq.load_table_from_dataframe(
              dataframe = traffic_popular_referrers,
              destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.traffic_popular_referrers"),
              job_config = bigquery.LoadJobConfig(
                  write_disposition = 'WRITE_APPEND',
                  autodetect = True, # detect schema
              ) 
        )
      append_job.result()

    # traffic_views
    query = f"""
    SELECT t.*
    FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_views` t
    WHERE 1=1 QUALIFY row_number() OVER(ORDER BY timestamp DESC) = 1
    """
    prior_traffic_views = bq.query(query = query).to_dataframe()
    metric_type = 'traffic/views'
    response = metric_get(metric_type)
    new_traffic_views = pd.DataFrame(json.loads(response.text)['views'])
    if new_traffic_views['timestamp'].iloc[-1] != datetime.now().strftime("%Y-%m-%dT00:00:00Z"):
      # gap, likely due to no clones on a day, insert today with uniques clones
      new_traffic_views = new_traffic_views.append({'timestamp': datetime.now().strftime("%Y-%m-%dT00:00:00Z"), 'count': 0, 'uniques': 0}, ignore_index = True).sort_values(by = ['timestamp'])
    new_traffic_views['uniques_last14days'] = np.nan
    new_traffic_views['uniques_last14days'].iloc[-1] = json.loads(response.text)['uniques']
    new_traffic_views['repo'] = github_user + '/' + github_repo
    overlap_record = new_traffic_views[new_traffic_views['timestamp'] == prior_traffic_views['timestamp'].iloc[0]]
    new_records = new_traffic_views[new_traffic_views['timestamp'] > prior_traffic_views['timestamp'].iloc[0]]
    if overlap_record.shape[0] == 1:
      if overlap_record[['timestamp', 'count', 'uniques']].values.tolist() != prior_traffic_views[['timestamp', 'count', 'uniques']].values.tolist():
        updated_record = overlap_record
        updated_record['uniques_last14days'].iloc[0] = prior_traffic_views['uniques_last14days'].iloc[0] 
        new_records = pd.concat([updated_record, new_records], ignore_index = True, axis = 0)
        job = bq.query(query = f"DELETE FROM `{BQ_PROJECT}.{BQ_DATASET}.traffic_views` WHERE timestamp = '{updated_record['timestamp'].iloc[0]}'")
        job.result()
    if new_records.shape[0] >=1:
      append_job = bq.load_table_from_dataframe(
            dataframe = new_records,
            destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.traffic_views"),
            job_config = bigquery.LoadJobConfig(
                write_disposition = 'WRITE_APPEND',
                autodetect = True, # detect schema
            ) 
      )
      append_job.result()

    # stargazers
    query = f"""
    SELECT *
    FROM `{BQ_PROJECT}.{BQ_DATASET}.stargazers`
    """
    known = bq.query(query = query).to_dataframe()
    # list of expected active stargazers (> covers added and re-added, = covers added and never dropped)
    known_active = known[known['added'] >= known['dropped']]
    # list of known users in current state of dropped
    known_inactive = known[known['dropped'] > known['added']]
    metric_type = 'stargazers'
    page_size = 100
    page = 1
    raw = []
    while page_size == 100:
        response = metric_get(metric_type, f'?per_page={page_size}&page={page}')
        raw_new = json.loads(response.text)
        raw += raw_new
        page_size = len(raw_new)
        page += 1
    stargazers = pd.DataFrame(raw)[['login']]
    current = stargazers
    # newly added: in current but not in known
    newly_added = pd.DataFrame([x for x in current['login'].values.tolist() if x not in known['login'].values.tolist()], columns = ['login'])
    newly_added['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_added['dropped'] = ''
    newly_added['count'] = 1
    newly_added['repo'] = github_user + '/' + github_repo
    # newly dropped: in known_active but not in current
    newly_dropped = pd.DataFrame([x for x in known_active['login'].values.tolist() if x not in current['login'].values.tolist()], columns = ['login'])
    newly_dropped = pd.merge(known_active, newly_dropped, how = 'inner', on = ['login'])
    newly_dropped['dropped'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    # newly readded: in current and in known_inactive
    newly_readded = pd.merge(current['login'], known_inactive['login'], how = 'inner', on = ['login'])
    newly_readded = pd.merge(newly_readded, known_inactive, how = 'inner', on = ['login'])
    newly_readded['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_readded['count'] = newly_readded['count'] + 1
    # newly combo
    new_records = pd.concat([newly_added, newly_dropped, newly_readded], ignore_index = True, axis = 0)
    stargazers_update = False
    if new_records.shape[0] >= 1:
      job = bq.query(query = f"""DELETE FROM `{BQ_PROJECT}.{BQ_DATASET}.stargazers` WHERE login in ({', '.join([f"'{x}'" for x in new_records['login'].values.tolist()])})""")
      job.result()
      append_job = bq.load_table_from_dataframe(
            dataframe = new_records,
            destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.stargazers"),
            job_config = bigquery.LoadJobConfig(
                write_disposition = 'WRITE_APPEND',
                autodetect = True, # detect schema
            ) 
      )
      append_job.result()
      stargazers_update = True

    # forks
    query = f"""
    SELECT *
    FROM `{BQ_PROJECT}.{BQ_DATASET}.forks`
    """
    known = bq.query(query = query).to_dataframe()
    # list of expected active stargazers (> covers added and re-added, = covers added and never dropped)
    known_active = known[known['added'] >= known['dropped']]
    # list of known users in current state of dropped
    known_inactive = known[known['dropped'] > known['added']]
    metric_type = 'forks'
    page_size = 100
    page = 1
    raw = []
    while page_size == 100:
        response = metric_get(metric_type, f'?per_page={page_size}&page={page}')
        raw_new = json.loads(response.text)
        raw += raw_new
        page_size = len(raw_new)
        page += 1
    forks = []
    for f in raw:
        forks += [{
            'name': f['name'],
            'full_name': f['full_name'],
            'owner': f['owner']['login'],
            'stars': f['stargazers_count'],
            'watchers': f['watchers_count'],
            'forks': f['forks_count']
        }]
    forks = pd.DataFrame(forks)
    current = forks
    # newly added: in current but not in known
    newly_added = pd.DataFrame([x for x in current['full_name'].values.tolist() if x not in known['full_name'].values.tolist()], columns = ['full_name'])
    newly_added = pd.merge(newly_added, current, how = 'inner', on = ['full_name'])
    newly_added['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_added['dropped'] = ''
    newly_added['count'] = 1
    newly_added['repo'] = github_user + '/' + github_repo
    # newly dropped: in known_active but not in current
    newly_dropped = pd.DataFrame([x for x in known_active['full_name'].values.tolist() if x not in current['full_name'].values.tolist()], columns = ['full_name'])
    newly_dropped = pd.merge(known_active, newly_dropped, how = 'inner', on = ['full_name'])
    newly_dropped['dropped'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    # newly readded: in current and in known_inactive
    newly_readded = pd.merge(current['full_name'], known_inactive['full_name'], how = 'inner', on = ['full_name'])
    newly_readded = pd.merge(newly_readded, known_inactive, how = 'inner', on = ['full_name'])
    newly_readded['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_readded['count'] = newly_readded['count'] + 1
    # newly combo
    new_records = pd.concat([newly_added, newly_dropped, newly_readded], ignore_index = True, axis = 0)
    # start with outer merge on all columns in current
    non_match = pd.merge(known_active, current, how = 'outer', indicator = True, left_on = ['full_name', 'stars', 'watchers', 'forks'], right_on = ['full_name', 'stars', 'watchers', 'forks'])
    # make list of full_name that did not have an exact match in current - these need updating
    non_match = non_match[non_match._merge == 'left_only']
    non_match = non_match[['full_name']]
    # now get current records for the non_match
    non_match = pd.merge(non_match, current, how = 'inner', on = ['full_name'])
    # now get updated records
    updated_records = pd.merge(known_active[['name', 'full_name', 'owner', 'added', 'dropped', 'count', 'repo']], non_match[['full_name', 'stars', 'watchers', 'forks']], how = 'inner', on = 'full_name')
    # stack updated records with the new_records before updating
    new_records = pd.concat([updated_records, new_records], ignore_index = True, axis = 0)
    forks_update = False
    if new_records.shape[0] >= 1:
      job = bq.query(query = f"""DELETE FROM `{BQ_PROJECT}.{BQ_DATASET}.forks` WHERE full_name in ({', '.join([f"'{x}'" for x in new_records['full_name'].values.tolist()])})""")
      job.result()
      append_job = bq.load_table_from_dataframe(
            dataframe = new_records,
            destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.forks"),
            job_config = bigquery.LoadJobConfig(
                write_disposition = 'WRITE_APPEND',
                autodetect = True, # detect schema
            ) 
      )
      append_job.result()
      forks_update = True

    # subscribers
    query = f"""
    SELECT *
    FROM `{BQ_PROJECT}.{BQ_DATASET}.subscribers`
    """
    known = bq.query(query = query).to_dataframe()
    # list of expected active stargazers (> covers added and re-added, = covers added and never dropped)
    known_active = known[known['added'] >= known['dropped']]
    # list of known users in current state of dropped
    known_inactive = known[known['dropped'] > known['added']]
    metric_type = 'subscribers'
    page_size = 100
    page = 1
    raw = []
    while page_size == 100:
        response = metric_get(metric_type, f'?per_page={page_size}&page={page}')
        raw_new = json.loads(response.text)
        raw += raw_new
        page_size = len(raw_new)
        page += 1
    subscribers = pd.DataFrame(raw)[['login']]
    current = subscribers
    # newly added: in current but not in known
    newly_added = pd.DataFrame([x for x in current['login'].values.tolist() if x not in known['login'].values.tolist()], columns = ['login'])
    newly_added['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_added['dropped'] = ''
    newly_added['count'] = 1
    newly_added['repo'] = github_user + '/' + github_repo
    # newly dropped: in known_active but not in current
    newly_dropped = pd.DataFrame([x for x in known_active['login'].values.tolist() if x not in current['login'].values.tolist()], columns = ['login'])
    newly_dropped = pd.merge(known_active, newly_dropped, how = 'inner', on = ['login'])
    newly_dropped['dropped'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    # newly readded: in current and in known_inactive
    newly_readded = pd.merge(current['login'], known_inactive['login'], how = 'inner', on = ['login'])
    newly_readded = pd.merge(newly_readded, known_inactive, how = 'inner', on = ['login'])
    newly_readded['added'] = datetime.now().strftime("%Y-%m-%dT00:00:00Z")
    newly_readded['count'] = newly_readded['count'] + 1
    # newly combo
    new_records = pd.concat([newly_added, newly_dropped, newly_readded], ignore_index = True, axis = 0)
    subscribers_update = False
    if new_records.shape[0] > 1:
      job = bq.query(query = f"""DELETE FROM `{BQ_PROJECT}.{BQ_DATASET}.subscribers` WHERE login in ({', '.join([f"'{x}'" for x in new_records['login'].values.tolist()])})""")
      job.result()
      append_job = bq.load_table_from_dataframe(
            dataframe = new_records,
            destination = bigquery.TableReference.from_string(f"{BQ_PROJECT}.{BQ_DATASET}.subscribers"),
            job_config = bigquery.LoadJobConfig(
                write_disposition = 'WRITE_APPEND',
                autodetect = True, # detect schema
            ) 
      )
      append_job.result()
      subscribers_update = True
      

    # END: Content from notebook: GitHub Metrics - 1 - Traffic
    # START: Content from notebook: GitHub Metrics - 2 - Traffic

    query = f"""
      DELETE FROM `vertex-ai-mlops-369716.reporting.github_traffic_clones` WHERE timestamp >= DATETIME(TIMESTAMP('2023-02-27T00:00:00Z'));
      INSERT INTO `vertex-ai-mlops-369716.reporting.github_traffic_clones`
      SELECT * EXCEPT(timestamp),
          DATETIME(TIMESTAMP(timestamp)) AS timestamp
      FROM `vertex-ai-mlops-369716.github_metrics.traffic_clones`
      WHERE timestamp >= '2023-02-27T00:00:00Z'
      ORDER BY timestamp;

      DELETE FROM `vertex-ai-mlops-369716.reporting.github_traffic_popular_paths` WHERE timestamp >= DATETIME(TIMESTAMP('2023-02-27T00:00:00Z'));
      INSERT INTO `vertex-ai-mlops-369716.reporting.github_traffic_popular_paths`
      SELECT * EXCEPT(timestamp),
          DATETIME(TIMESTAMP(timestamp)) AS timestamp
      FROM `vertex-ai-mlops-369716.github_metrics.traffic_popular_paths`
      WHERE timestamp >= '2023-02-27T00:00:00Z'
      ORDER BY timestamp, count DESC;

      DELETE FROM `vertex-ai-mlops-369716.reporting.github_traffic_popular_referrers` WHERE timestamp >= DATETIME(TIMESTAMP('2023-02-27T00:00:00Z'));
      INSERT INTO `vertex-ai-mlops-369716.reporting.github_traffic_popular_referrers`
      SELECT * EXCEPT(timestamp),
          DATETIME(TIMESTAMP(timestamp)) AS timestamp
      FROM `vertex-ai-mlops-369716.github_metrics.traffic_popular_referrers`
      WHERE timestamp >= '2023-02-27T00:00:00Z'
      ORDER BY timestamp, count DESC;

      DELETE FROM `vertex-ai-mlops-369716.reporting.github_traffic_views` WHERE timestamp >= DATETIME(TIMESTAMP('2023-02-27T00:00:00Z'));
      INSERT INTO `vertex-ai-mlops-369716.reporting.github_traffic_views`
      SELECT * EXCEPT(timestamp),
          DATETIME(TIMESTAMP(timestamp)) AS timestamp
      FROM `vertex-ai-mlops-369716.github_metrics.traffic_views`
      WHERE timestamp >= '2023-02-27T00:00:00Z'
      ORDER BY timestamp;
    """
    job = bq.query(query = query)
    job.result()
    print(job.state)

    if stargazers_update:
      query = f"""
        CREATE OR REPLACE TABLE `vertex-ai-mlops-369716.reporting.github_stargazers` AS
        SELECT * EXCEPT(added, dropped),
            CASE WHEN added = '' THEN NULL ELSE DATETIME(TIMESTAMP(added)) END AS added,
            CASE WHEN dropped = '' THEN NULL ELSE DATETIME(TIMESTAMP(dropped)) END AS dropped
        FROM `vertex-ai-mlops-369716.github_metrics.stargazers`
        WHERE added >= dropped
        ORDER BY login;
      """
      job = bq.query(query = query)
      job.result()
      print(job.state)
    
    if forks_update:
      query = f"""
        CREATE OR REPLACE TABLE `vertex-ai-mlops-369716.reporting.github_forks` AS
        SELECT * EXCEPT(added, dropped),
            CASE WHEN added = '' THEN NULL ELSE DATETIME(TIMESTAMP(added)) END AS added,
            CASE WHEN dropped = '' THEN NULL ELSE DATETIME(TIMESTAMP(dropped)) END AS dropped
        FROM `vertex-ai-mlops-369716.github_metrics.forks`
        WHERE added >= dropped
        ORDER BY full_name;
      """
      job = bq.query(query = query)
      job.result()
      print(job.state)

    if subscribers_update:
      query = f"""
        CREATE OR REPLACE TABLE `vertex-ai-mlops-369716.reporting.github_subscribers` AS
        SELECT * EXCEPT(added, dropped),
            CASE WHEN added = '' THEN NULL ELSE DATETIME(TIMESTAMP(added)) END AS added,
            CASE WHEN dropped = '' THEN NULL ELSE DATETIME(TIMESTAMP(dropped)) END AS dropped
        FROM `vertex-ai-mlops-369716.github_metrics.subscribers`
        WHERE added >= dropped
        ORDER BY login;
      """
      job = bq.query(query = query)
      job.result()
      print(job.state)

    # END: Content from notebook: GitHub Metrics - 2 - Traffic

Writing temp/function/main.py


In [70]:
!ls {DIR}/function

main.py  requirements.txt


### Zip Files

In [71]:
import zipfile
with zipfile.ZipFile(f'{DIR}/function/function_traffic.zip', mode = 'w') as archive:
    archive.write(f'{DIR}/function/main.py', 'main.py')
    archive.write(f'{DIR}/function/requirements.txt', 'requirements.txt')

In [72]:
!ls {DIR}/function

function_traffic.zip  main.py  requirements.txt


In [73]:
with zipfile.ZipFile(f'{DIR}/function/function_traffic.zip', mode = 'r') as zip:
    zip.printdir()

File Name                                             Modified             Size
main.py                                        2023-02-28 18:21:40        21728
requirements.txt                               2023-02-28 18:21:40           39


### Move Files to GCS

Expects a bucket with the same name as the project:

In [74]:
bucket = gcs.bucket(PROJECT_ID)

In [75]:
SOURCEPATH = f'architectures/tracking/setup/github'
blob = bucket.blob(f'{SOURCEPATH}/function_traffic.zip')
blob.upload_from_filename(f'{DIR}/function/function_traffic.zip')

In [76]:
list(bucket.list_blobs(prefix = f'{SOURCEPATH}'))

[<Blob: vertex-ai-mlops-369716, architectures/tracking/setup/github/function_commit.zip, 1677586594050541>,
 <Blob: vertex-ai-mlops-369716, architectures/tracking/setup/github/function_traffic.zip, 1677608503169177>]

In [77]:
print(f"View the bucket directly here:\nhttps://console.cloud.google.com/storage/browser/{PROJECT_ID}/{SOURCEPATH};tab=objects&project={PROJECT_ID}")

View the bucket directly here:
https://console.cloud.google.com/storage/browser/vertex-ai-mlops-369716/architectures/tracking/setup/github;tab=objects&project=vertex-ai-mlops-369716


### Service Account
The Cloud Function will run as a service account.  Retrieve the default app engine service account and check its permissions.  It needs to be able to read/write to BigQuery and read secrets from the secret manager.

I used the Console to create a service account for these jobs:
- Console > IAM > Service Accounts
- Create New: name = `metrics-runner`
- roles = BigQuery Admin, Secret Accessor



In [78]:
print(f'Review Service Account Details in Console:\nhttps://console.cloud.google.com/iam-admin/serviceaccounts?project={PROJECT_ID}')

Review Service Account Details in Console:
https://console.cloud.google.com/iam-admin/serviceaccounts?project=vertex-ai-mlops-369716


### Create (or Update) Cloud Function

In [79]:
function_name = f'github_metrics_traffic'

In [80]:
function = ''
for function in functions_client.list_functions(request = functions_v1.ListFunctionsRequest(parent = f'projects/{PROJECT_ID}/locations/{REGION}')):
    if function.name.endswith(function_name):
        break
    else: function = ''

In [81]:
function

name: "projects/vertex-ai-mlops-369716/locations/us-central1/functions/github_metrics_traffic"
source_archive_url: "gs://vertex-ai-mlops-369716/architectures/tracking/setup/github/function_traffic.zip"
event_trigger {
  event_type: "providers/cloud.pubsub/eventTypes/topic.publish"
  resource: "projects/vertex-ai-mlops-369716/topics/daily_metrics_triggers"
  service: "pubsub.googleapis.com"
  failure_policy {
  }
}
status: ACTIVE
entry_point: "collect"
timeout {
  seconds: 420
}
available_memory_mb: 256
service_account_email: "metrics-runner@vertex-ai-mlops-369716.iam.gserviceaccount.com"
update_time {
  seconds: 1677608354
  nanos: 641000000
}
version_id: 2
runtime: "python310"
max_instances: 3000
ingress_settings: ALLOW_ALL
build_id: "0bb482d9-7c11-4847-9bae-bf2988cc8c70"
secret_environment_variables {
  key: "GITHUB_PAT"
  project_id: "807305962454"
  secret: "github_api"
  version: "latest"
}
build_name: "projects/807305962454/locations/us-central1/builds/0bb482d9-7c11-4847-9bae-bf2

In [82]:
from google.protobuf.duration_pb2 import Duration

functionDef = functions_v1.CloudFunction()
functionDef.name = f'projects/{PROJECT_ID}/locations/{REGION}/functions/{function_name}'
functionDef.source_archive_url = f"gs://{PROJECT_ID}/{SOURCEPATH}/function_traffic.zip"
functionDef.event_trigger = functions_v1.EventTrigger()
functionDef.event_trigger.event_type = 'providers/cloud.pubsub/eventTypes/topic.publish'
functionDef.event_trigger.resource = topic.name
functionDef.runtime = 'python310'
functionDef.entry_point = 'collect'
functionDef.timeout = Duration(seconds = 420)
functionDef.service_account_email = f"metrics-runner@{PROJECT_ID}.iam.gserviceaccount.com"
functionDef.secret_environment_variables = [functions_v1.SecretEnvVar(
    key = 'GITHUB_PAT',
    secret = 'github_api'
)]

In [83]:
functionDef

name: "projects/vertex-ai-mlops-369716/locations/us-central1/functions/github_metrics_traffic"
source_archive_url: "gs://vertex-ai-mlops-369716/architectures/tracking/setup/github/function_traffic.zip"
event_trigger {
  event_type: "providers/cloud.pubsub/eventTypes/topic.publish"
  resource: "projects/vertex-ai-mlops-369716/topics/daily_metrics_triggers"
}
entry_point: "collect"
timeout {
  seconds: 420
}
service_account_email: "metrics-runner@vertex-ai-mlops-369716.iam.gserviceaccount.com"
runtime: "python310"
secret_environment_variables {
  key: "GITHUB_PAT"
  secret: "github_api"
}

In [84]:
if function:
    request = functions_v1.UpdateFunctionRequest(
        function = functionDef
    )
    operation = functions_client.update_function(request = request)
else:
    request = functions_v1.CreateFunctionRequest(
        location = f"projects/{PROJECT_ID}/locations/{REGION}",
        function = functionDef
    )
    operation = functions_client.create_function(request = request)

In [85]:
response = operation.result()
print(response)

name: "projects/vertex-ai-mlops-369716/locations/us-central1/functions/github_metrics_traffic"
source_archive_url: "gs://vertex-ai-mlops-369716/architectures/tracking/setup/github/function_traffic.zip"
event_trigger {
  event_type: "providers/cloud.pubsub/eventTypes/topic.publish"
  resource: "projects/vertex-ai-mlops-369716/topics/daily_metrics_triggers"
  service: "pubsub.googleapis.com"
  failure_policy {
  }
}
status: ACTIVE
entry_point: "collect"
timeout {
  seconds: 420
}
available_memory_mb: 256
service_account_email: "metrics-runner@vertex-ai-mlops-369716.iam.gserviceaccount.com"
update_time {
  seconds: 1677608667
  nanos: 126000000
}
version_id: 3
runtime: "python310"
max_instances: 3000
ingress_settings: ALLOW_ALL
build_id: "c978ef09-74be-4ca3-b367-fb733d65133d"
secret_environment_variables {
  key: "GITHUB_PAT"
  project_id: "807305962454"
  secret: "github_api"
  version: "latest"
}
build_name: "projects/807305962454/locations/us-central1/builds/c978ef09-74be-4ca3-b367-fb7

In [86]:
print(f'Review the Cloud Function in the console here:\nhttps://console.cloud.google.com/functions/list?env=gen1&project={PROJECT_ID}')

Review the Cloud Function in the console here:
https://console.cloud.google.com/functions/list?env=gen1&project=vertex-ai-mlops-369716


### Manual Run of Cloud Function

Publish a message to the Pub/Sub topic that will cause the Cloud Function to initiate training.  The code below could be anywhere you want to trigger training!

The function will receive the message as `event` in the format:
```
{
    '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
    'attributes': {'key' : 'value', ...},
    'data': <base64 encoded string>
}
```

To handle the `event` and retrieve the inputs of the message three things need to happen:
1. reference the 'data' value as `event['data']`
2. decode the 'data' value with `base64.b64decode(<1>).decode('utf-8')`
3. convert the decoded string into a Python dictionary with `json.loads(<2>)`

This looks like:
```
funtion_inputs = json.loads(base64.b64decode(event['data']).decode('utf-8'))
```

In [87]:
function_input = {
    'PROJECT_ID': PROJECT_ID
}

In [88]:
message = json.dumps(function_input)
message = message.encode('utf-8')

In [89]:
future = pubsub_pubclient.publish(topic.name, message, trigger = 'manual')

In [90]:
future.result()

'7008586715462781'

---
## Scheduled Run with Cloud Scheduler

Use Cloud Scheduler to publish a message to the topic at any defined interval which will cause the Cloud Function to initiate training.

Resources:
- List of Time zones - [TZ Database Names](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)
- Job Frequency - [unix-cron format guide](https://man7.org/linux/man-pages/man5/crontab.5.html)
    - minute hour day_of_month month day_of_week
    - 0 23 * * tue = 11PM every Tuesday


In [91]:
schedule_name = 'daily_3am_est'

In [92]:
schedule = ''
for schedule in scheduler_client.list_jobs(parent = f'projects/{PROJECT_ID}/locations/{REGION}'):
    if schedule.name.endswith(schedule_name):
        break
    else: schedule = ''

In [93]:
if schedule:
    print(schedule)
else:
    request = scheduler_v1.CreateJobRequest(
        parent = f'projects/{PROJECT_ID}/locations/{REGION}',
        job = scheduler_v1.Job(
            name = f'projects/{PROJECT_ID}/locations/{REGION}/jobs/{schedule_name}',
            pubsub_target = scheduler_v1.PubsubTarget(
                topic_name = topic.name,
                data = message,
                attributes = {'trigger': 'scheduled'}
            ),
            schedule = '0 3 * * *',
            time_zone = 'America/New_York'
        )
    )
    schedule = scheduler_client.create_job(request = request)
    print(schedule)

name: "projects/vertex-ai-mlops-369716/locations/us-central1/jobs/daily_3am_est"
pubsub_target {
  topic_name: "projects/vertex-ai-mlops-369716/topics/daily_metrics_triggers"
  data: "{\"PROJECT_ID\": \"vertex-ai-mlops-369716\"}"
  attributes {
    key: "trigger"
    value: "scheduled"
  }
}
user_update_time {
  seconds: 1676847544
}
state: ENABLED
status {
}
schedule_time {
  seconds: 1677657600
  nanos: 855718000
}
last_attempt_time {
  seconds: 1677571200
  nanos: 689748000
}
schedule: "0 3 * * *"
time_zone: "America/New_York"



In [94]:
print(f'Review the schedule in the console:\nhttps://console.cloud.google.com/cloudscheduler?project={PROJECT_ID}')

Review the schedule in the console:
https://console.cloud.google.com/cloudscheduler?project=vertex-ai-mlops-369716
