# YouTube API Connector

First of all, let's import the libraries to connect to the YouTube API and manipulate the data:

In [1]:
# Importing libraries
import os
import pandas as pd
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors
import googleapiclient.errors

Then, let's add the credentials to connect to the YouTube Data v3 API. This will provide us a generic information about the channel, including the ID of the playlist where all the videos are updated.

In [2]:
# Defining the API Key
api_key = 'api_key'

# Defining the API name and version before connection
youtube_api_service_name = "youtube"
youtube_api_version = "v3"

youtube = build(youtube_api_service_name, youtube_api_version, developerKey = api_key)

# Gathering the channel content details
request = youtube.channels().list(part='ContentDetails', id = 'UCzQTrA_c1BgRNLewVyt2UFw')

response = request.execute()

print(response)

{'kind': 'youtube#channelListResponse', 'etag': 'xm0qCYcUUWM0F3XMlRFDWwej_n8', 'pageInfo': {'totalResults': 1, 'resultsPerPage': 5}, 'items': [{'kind': 'youtube#channel', 'etag': 'eyLQNm6fwyqFzcXjoobAzVEH-Cw', 'id': 'UCzQTrA_c1BgRNLewVyt2UFw', 'contentDetails': {'relatedPlaylists': {'likes': '', 'uploads': 'UUzQTrA_c1BgRNLewVyt2UFw'}}}]}


In the next step, this playlist ID will allow to provide us to get all the video IDs, the title, the description, and the publish date. I have created a function to gather all this data for this step.

As it is only possible to get the data from only 50 videos, it is necessary to use the 'nextPageToken' to extract the data from the rest of the videos.

In [3]:
# Adding read permissions to the API
scopes = ["https://www.googleapis.com/auth/youtube.readonly"]

# Creating a function to gather all the video information from the previous playlistId
def gather_youtube_videos(playlistId):

    youtube = build(youtube_api_service_name, youtube_api_version, developerKey = api_key)
    res = youtube.playlistItems().list(part="snippet", playlistId='UUzQTrA_c1BgRNLewVyt2UFw', maxResults="50").execute()

    nextPageToken = res.get('nextPageToken')
    
    # Creating a while structure to gather the video from the rest of the pages. 
    #It will stop when there hare no more 'nextPageToken' available
    while ('nextPageToken' in res):
        nextPage = youtube.playlistItems().list(
        part="snippet",
        playlistId=playlistId,
        maxResults="50",
        pageToken=nextPageToken
        ).execute()
        
        res['items'] = res['items'] + nextPage['items']

        if 'nextPageToken' not in nextPage:
            res.pop('nextPageToken', None)
        else:
            nextPageToken = nextPage['nextPageToken']

    return res

In [4]:
# Apply the function over the PlayListId to gather the basic data from the videos
info_videos = gather_youtube_videos('UUzQTrA_c1BgRNLewVyt2UFw')
info_videos

{'kind': 'youtube#playlistItemListResponse',
 'etag': 'bo-wyk_P4uUoAgZTOXLK6R-FbGg',
 'items': [{'kind': 'youtube#playlistItem',
   'etag': 'PUy1qG56eHOltN0f9Ixc244-0jk',
   'id': 'VVV6UVRyQV9jMUJnUk5MZXdWeXQyVUZ3LkRRdm1BMlJBY21j',
   'snippet': {'publishedAt': '2022-06-28T16:00:14Z',
    'channelId': 'UCzQTrA_c1BgRNLewVyt2UFw',
    'title': '¿Cómo pueden las moléculas pueden tener RESONANCIA?.  - Agregados moleculares - [ESO/Bachillerato]',
    'description': 'Algunas moléculas pueden tener ELECTRONES en movimiento, los cuales generan la RESONANCIA.\n\nSupera tus Exámenes de Química con NOTA 👇\n\n►Temario de AGREGADOS MOLECULARES: https://www.youtube.com/playlist?list=PL4-4wRXS1MiksTL1US8NPJkf5qUWqBqi_\n\n►Temario de ESO y 1º Bachillerato: https://www.youtube.com/playlist?list=PL4-4wRXS1Mil4byLKh2WN6xnx8v8jZ_Ur\n\n★¿TIENES DUDAS?★\nDéjame el ejercicio en los comentarios y haré un vídeo para resolverlo.',
    'thumbnails': {'default': {'url': 'https://i.ytimg.com/vi/DQvmA2RAcmc/default

Let's extract the basic information for each video from the previous dictionary:

In [5]:
# Defining the variables to store the data from the dictionary
video_id = []
title = []
description = []
publish_date = []
video_url = []

# Gather all the video IDs from the dictionary
for item in info_videos["items"]:
    video_id.append(item['snippet']['resourceId']['videoId'])

# Gather all the titles from the dictionary
for item in info_videos["items"]:
    title.append(item['snippet']['title'])

# Gather all the descriptions from the dictionary
for item in info_videos["items"]:
    description.append(item['snippet']['description'])

# Gather all the publish dates from the dictionary
for item in info_videos["items"]:
    publish_date.append(item['snippet']['publishedAt'])
      
# Create the video URLs with the video IDs
for item in range(len(info_videos['items'])):
    video_url.append(f'https://youtu.be/{video_id[item]}')

Let's save all those lists in a new DataFrame:

In [6]:
# Creating the dataframe
df = pd.DataFrame({'publish_date' : publish_date, 'video_id' : video_id, 'video_url': video_url, 
                   'title' : title, 'description' : description})

# Modifying the publish_date as datetime object
df['publish_date'] = df['publish_date'].astype('datetime64[ns]')

Now, I will create a copy of this DataFrame and add the 'Grade' variable to introduce if it's E.S.O. (from 7th to 10th grade) or Bachillerato (11th & 12th grades).

In [7]:
# Creating a copy of the DataFrame
df_content = df.copy()

# Assining grades to each video
df_content['Grade'] = '2 Bachillerato'
df_content.loc[df_content['title'].str.contains('ESO'), 'Grade'] = 'ESO/1 Bachillerato'

In the following step, I will gather different metrics (views, likes, shares...) from all the videos on a daily basis and store it on a dictionary.

In [9]:
# Creating a dictionary to store the data
dic = {}

# Creating a function to gather the data
def main():

    api_service_name = "youtubeAnalytics"
    api_version = "v2"
    client_secrets_file = "file_name.json"

    # Get credentials and create API client
    flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
        client_secrets_file, scopes)
    credentials = flow.run_console()
    youtube_analytics = googleapiclient.discovery.build(
        api_service_name, api_version, credentials=credentials)

    # Get the metrics for each video
    for i in range(len(video_id)):
        request1 = youtube_analytics.reports().query(
            dimensions="day",
            endDate="2022-10-10",
            ids="channel==MINE",
            maxResults=150,
            metrics="views,likes,comments,dislikes,shares,subscribersLost,subscribersGained,estimatedMinutesWatched,averageViewDuration,averageViewPercentage,annotationImpressions,annotationClicks,annotationClickThroughRate,cardImpressions,cardClicks,cardClickRate",
            filters = f'video=={video_id[i]}',
            startDate="2021-03-01"
    )
        response2 = request1.execute()
        
        dic[video_id[i]] = response2['rows']

if __name__ == "__main__":
    main()

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=852632511285-09e8025vv77tqnorhjgp8lof8j6j67cm.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fyoutube.readonly&state=8vrGKDStuzwEImr0kRluYCGxsZkw4p&prompt=consent&access_type=offline
Enter the authorization code: 4/1ARtbsJoCAZoSUFuL383fBGLz0oa9bGQ-QQ7105dJHwfaPt6LoZ06-2ntXEA


Now, all the data gathered has been stored in a dictionary. Let's transform that dictionary into a DataFrame.

In [10]:
# Create a list with the variables extracted from each video
video_metrics = ['date', 'views','likes','comments','dislikes','shares','subscribersLost','subscribersGained',
           'estimatedMinutesWatched','averageViewDuration','averageViewPercentage','annotationImpressions',
           'annotationClicks','annotationClickThroughRate','cardImpressions','cardClicks','cardClickRate']

# Use the .items() method to store the key and the values separately in the DataFrame
df_metrics = pd.DataFrame(dic.items())

# Renaming the columns
df_metrics.rename(columns={0: 'video_id', 1: 'metrics'}, inplace=True, errors='raise')

I will use the function .explode() in the next step to transform each element of a list-like to a row, replicating the index values. That's why I will a reset_index() function at the end of the chain.

In [11]:
# Use .explode() function to desaggregate the lists on the metrics columns
df_stacked = df_metrics.explode('metrics').reset_index(drop=True)
df_stacked

Unnamed: 0,video_id,metrics
0,DQvmA2RAcmc,"[2022-06-28, 2, 0, 0, 0, 0, 0, 0, 0, 19, 2.73,..."
1,DQvmA2RAcmc,"[2022-06-29, 2, 2, 1, 0, 0, 0, 0, 7, 229, 31.9..."
2,DQvmA2RAcmc,"[2022-06-30, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
3,DQvmA2RAcmc,"[2022-07-01, 1, 0, 0, 0, 0, 0, 0, 1, 106, 14.8..."
4,DQvmA2RAcmc,"[2022-07-02, 2, 0, 0, 0, 0, 0, 0, 1, 35, 4.89,..."
...,...,...
10866,oD04M7jB75s,"[2021-10-07, 9, 0, 0, 0, 0, 0, 0, 0, 2, 0.89, ..."
10867,oD04M7jB75s,"[2021-10-08, 4, 0, 0, 0, 0, 0, 0, 2, 41, 13.45..."
10868,oD04M7jB75s,"[2021-10-09, 8, 0, 0, 0, 0, 0, 0, 4, 31, 10.19..."
10869,oD04M7jB75s,"[2021-10-10, 11, 0, 0, 0, 0, 0, 0, 12, 70, 22...."


Now, let's do some final transformations before exporting this DataFrame to .csv

In [12]:
# Changing the metrics variable as string to do some replacements
df_stacked['metrics'] = df_stacked['metrics'].astype(str)

# Replace the brackers by nothing
df_stacked['metrics'] = df_stacked['metrics'].str.replace('[', '')
df_stacked['metrics'] = df_stacked['metrics'].str.replace(']', '')

  df_stacked['metrics'] = df_stacked['metrics'].str.replace('[', '')
  df_stacked['metrics'] = df_stacked['metrics'].str.replace(']', '')


In [None]:
# Split the metrics columns into several columns
df_stacked[video_metrics] = df_stacked['metrics'].str.split(',', expand = True)
df_stacked.drop('metrics', axis = 1, inplace = True)
df_stacked['date'] = df_stacked['date'].str.replace("'", '')

In [None]:
df_stacked.to_csv('video_performance.csv', index = False)

In the following steps, I will connect to Google BigQuery with two different goals:

1. Create a table to store the data recently extracted
2. Pull the output file got in the previous step to that Google BigQuery table

In [None]:
# Importing libraries to connect with Google BigQuery
from google.cloud import bigquery
from google.oauth2 import service_account

# Adding the credentials from the JSON file to connect with the platform
credentials = service_account.Credentials.from_service_account_file(
'file_name.json')
project_id = 'project_name'
client = bigquery.Client(credentials= credentials, project=project_id)

In [None]:
# Set table_id to the ID of the table to create
table_id = 'table_id'

# Add the schema of the table
schema = [
    bigquery.SchemaField("video_id", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("date", "DATE", mode="NULLABLE"),
    bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("comments", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("dislikes", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("shares", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("subscribersLost", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("subscribersGained", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("estimatedMinutesWatched", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("averageViewDuration", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("averageViewPercentage", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("annotationImpressions", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("annotationClicks", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("annotationClickThroughRate", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("cardImpressions", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("cardClicks", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("cardClickRate", "FLOAT", mode="NULLABLE"),
]

# Make an API request to create the table
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

Finally, let's push the .csv() file created to Google BigQuery

In [None]:
# Specify the name of the file
file_path = 'video_performance.csv'

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True,
)

# Open and read the file
with open(file_path, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() 

# Make an API request to upload the table
table = client.get_table(table_id)  
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)