In [1]:
import boto3
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
import json

#define the AWS region and the name of the secret to be created in AWS Secrets Manager
region_name = input("Please enter your AWS region:")


client = boto3.client('secretsmanager', region_name=region_name)
response = client.list_secrets()

secrets_list = [x["Name"] for x in response["SecretList"]]

secret_name = input("Please enter the desired secret name for storage in AWS Secrets Manager:")

if secret_name in secrets_list:
    print("The secret you specified already exists in the defined region. I will retrieve the credentials from there.")

    # Retrieve the secret value
    response = client.get_secret_value(SecretId=secret_name)
    credentials_dict = json.loads(response['SecretString'])
else:
    print(f"I will now create the new secret {secret_name}.")
    client.create_secret(name=secret_name)

The secret you specified already exists in the defined region. I will retrieve the credentials from there.


In [2]:
credentials = None

# Check if the credentials are available in the Secrets Manager
if 'token' in credentials_dict and 'refresh_token' in credentials_dict:
    credentials = Credentials.from_authorized_user_info(credentials_dict)

    # Check if the credentials are expired
    if credentials.expired:
        # Refresh the credentials
        credentials.refresh(Request())

        client = boto3.client('secretsmanager', region_name=region_name)
        client.put_secret_value(SecretId=secret_name, SecretString=credentials.to_json())
else: 
    print("Please store your YouTube API OAuth credentials in Secrets Manager first.")



# Test if you can establish a connection to the YouTube API
youtubeReporting = build('youtubereporting', 'v1', credentials=credentials)

Now we will first check on the YouTube API what types of reports can be generated.

In [3]:
report_types = youtubeReporting.reportTypes().list().execute()

In [4]:
report_types

{'reportTypes': [{'id': 'channel_annotations_a1', 'name': 'Annotations'},
  {'id': 'channel_basic_a2', 'name': 'User activity'},
  {'id': 'channel_cards_a1', 'name': 'Cards'},
  {'id': 'channel_combined_a2', 'name': 'Combined'},
  {'id': 'channel_demographics_a1', 'name': 'Demographics'},
  {'id': 'channel_device_os_a2', 'name': 'Device and OS'},
  {'id': 'channel_end_screens_a1', 'name': 'End screens'},
  {'id': 'channel_playback_location_a2', 'name': 'Playback locations'},
  {'id': 'channel_province_a2', 'name': 'Province'},
  {'id': 'channel_sharing_service_a1', 'name': 'Sharing service'},
  {'id': 'channel_subtitles_a2', 'name': 'Subtitles'},
  {'id': 'channel_traffic_source_a2', 'name': 'Traffic sources'},
  {'id': 'playlist_basic_a1', 'name': 'Playlist user activity'},
  {'id': 'playlist_combined_a1', 'name': 'Playlist combined'},
  {'id': 'playlist_device_os_a1', 'name': 'Playlist device and OS'},
  {'id': 'playlist_playback_location_a1',
   'name': 'Playlist playback locations'

Now let's check what report jobs have already been scheduled.

In [5]:
jobs = youtubeReporting.jobs().list().execute()

In [6]:
jobs

{'jobs': [{'id': '4a7e6f19-e49f-4418-9800-f0ba979a8437',
   'reportTypeId': 'channel_demographics_a1',
   'name': 'channel_demographics_a1_20240322',
   'createTime': '2024-03-22T14:01:43Z'},
  {'id': '82bc9b78-afbf-470e-8c1f-e9a7d2fe280d',
   'reportTypeId': 'channel_combined_a2',
   'name': 'channel_combined_a2_20240322',
   'createTime': '2024-03-22T14:01:42Z'},
  {'id': 'a7f41b3e-2b81-488d-8ed0-1f8c236e1a54',
   'reportTypeId': 'channel_basic_a2',
   'name': 'channel_basic_a2_20240322',
   'createTime': '2024-03-22T14:01:41Z'},
  {'id': 'bff80780-0f0f-4caa-af3e-de968ec64e9e',
   'reportTypeId': 'channel_sharing_service_a1',
   'name': 'channel_sharing_service_a1_20240322',
   'createTime': '2024-03-22T14:01:44Z'}]}

If this list is empty for you then can now create these jobs. You need to provide the necessary ID from the reportTypes API call and define a name. Again, in case you have multiple YouTube channels for which you want to get data then you need to do this task multiple times.

In [None]:
report_type = ''
report_name = ''

youtubeReporting.jobs().create(
    body=dict(reportTypeId=report_type,
              name=report_name)
              ).execute()

You can now check again if this was registered by the system.

In [None]:
jobs = youtubeReporting.jobs().list().execute()
jobs

Final note: my implementation of the Lambda function was based on the four report types I found most interesting. In these cases only the conversion of decimal columns was needed. In case you want some other reports you should also download some report files locally first and analyse the data to see what adjustments might be needed in the function code.