# Example Youtube Data Pipeline

This notebook provides an example of a simple data pipeline to ingest, process, validate, version, and store Youtube data as discussed in [https://bradleyboehmke.github.io/uc-bana-7075/04-dataops-build.html#hands-on-example-a-youtube-data-pipeline](https://bradleyboehmke.github.io/uc-bana-7075/04-dataops-build.html#hands-on-example-a-youtube-data-pipeline).

## Requirements

In [1]:
import great_expectations as gx
import os
import numpy as np
import pandas as pd
import unicodedata
import warnings

from dataops_utils import (
    ingest_channel_video_ids,
    ingest_video_stats,
    ingest_video_transcript,
)
from dotenv import load_dotenv

In [2]:
# silence some unnecessary messages caused by great expectations
warnings.filterwarnings('ignore')

In [3]:
# I have my API key set as an environment variable
load_dotenv()
API_KEY = os.getenv('YOUTUBE_API_KEY')

# In your case you can add your API key here
if API_KEY is None:
    API_KEY = "INSERT_YOUR_YOUTUBE_API_KEY"

BASE_URL = "https://www.googleapis.com/youtube/v3"
CHANNEL_ID = 'UCgUueMmSpcl-aCTt5CuCKQw'

## Data Ingestion

In [4]:
# Ingest Youtube video IDs
video_ids = ingest_channel_video_ids(API_KEY, CHANNEL_ID)

# Example of what the first record looks like
video_ids[0]

{'channel_id': 'UCgUueMmSpcl-aCTt5CuCKQw',
 'video_id': 'wzrIKGcOlsU',
 'datetime': '2025-01-13T17:00:24Z',
 'title': 'Rory McIlroy has another gear.'}

In [5]:
# Ingest Youtube video statistics
video_data = ingest_video_stats(video_ids, API_KEY)

# Example of the stats collected for the first video
video_data[0]

[00:43<00:00] 250/250 | 100%|██████████  5.77it/s


{'channel_id': 'UCgUueMmSpcl-aCTt5CuCKQw',
 'video_id': 'wzrIKGcOlsU',
 'datetime': '2025-01-13T17:00:24Z',
 'title': 'Rory McIlroy has another gear.',
 'views': '3142',
 'likes': '304',
 'comments': '16'}

In [6]:
# Ingest Youtube video transcripts
video_data = ingest_video_transcript(video_data)

# Example of the final raw data that includes
# video ID, title, date, stats, and transcript
video_data[0]

[04:13<00:00] 250/250 | 100%|██████████  1.01s/it


{'channel_id': 'UCgUueMmSpcl-aCTt5CuCKQw',
 'video_id': 'wzrIKGcOlsU',
 'datetime': '2025-01-13T17:00:24Z',
 'title': 'Rory McIlroy has another gear.',
 'views': '3142',
 'likes': '304',
 'comments': '16',
 'transcript': "I've never seen you go after the ball like this there it is wow that was it that was so good 190 190 nice that was hit well good what was that 72 as well that was good that was nice that was FL that was nice 127 A2 191 that's gone 34 there you go"}

## Data Processing

In [7]:
raw_data = pd.DataFrame(video_data)
raw_data.head()

Unnamed: 0,channel_id,video_id,datetime,title,views,likes,comments,transcript
0,UCgUueMmSpcl-aCTt5CuCKQw,wzrIKGcOlsU,2025-01-13T17:00:24Z,Rory McIlroy has another gear.,3142,304,16,I've never seen you go after the ball like thi...
1,UCgUueMmSpcl-aCTt5CuCKQw,Pz42jFngEzM,2025-01-13T03:25:25Z,Thank you. 1 Million ❤️,59474,6757,428,[Music] so sick let's go [Music]
2,UCgUueMmSpcl-aCTt5CuCKQw,fSHh01YT0-Q,2025-01-07T18:50:32Z,Tiger Woods hits the ball off the heel.,64116,2055,30,over the course of my career I've always hit t...
3,UCgUueMmSpcl-aCTt5CuCKQw,erzLT7fy2r0,2025-01-07T17:56:07Z,Tiger Woods liked my golf swing!,122584,4486,76,what's wrong with that yeah that came off you ...
4,UCgUueMmSpcl-aCTt5CuCKQw,3O08SnyZ88U,2025-01-07T17:07:04Z,Tiger Woods teaches me how to hit it straight!,555884,18286,142,what did you do in your career when you had a ...


In [8]:
# Remove rows with missing data
cleaned_data = raw_data.dropna()

# Remove duplicate rows
cleaned_data = cleaned_data.drop_duplicates()

# Remove any inconsistent data types
for col in ['views', 'likes', 'comments']:
    cleaned_data[col] = pd.to_numeric(cleaned_data[col], errors='coerce')

# Remove any observations that have invalid datetime values
cleaned_data['datetime'] = pd.to_datetime(cleaned_data['datetime'], errors='coerce')
cleaned_data = cleaned_data.dropna(subset=['datetime'])

# Remove any observations where the views value is less than 3 standard deviations
# from the mean
mean_views = cleaned_data['views'].mean()
std_views = cleaned_data['views'].std()
cleaned_data = cleaned_data[cleaned_data['views'] >= (mean_views - 3 * std_views)]

# Remove any observations where the transcript length is less than 3 standard deviations
# from the mean transcript length
cleaned_data['transcript_length'] = cleaned_data['transcript'].apply(lambda x: len(x) if pd.notnull(x) else 0)
mean_transcript_length = cleaned_data['transcript_length'].mean()
std_transcript_length = cleaned_data['transcript_length'].std()
cleaned_data = cleaned_data[cleaned_data['transcript_length'] >= (mean_transcript_length - 3 * std_transcript_length)]

# Remove/clean the title and transcript columns for non-character string values
# (i.e. unicode characters)
def clean_text(text):
    if isinstance(text, str):
        return unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('ascii')
    return text

cleaned_data['title'] = cleaned_data['title'].apply(clean_text)
cleaned_data['transcript'] = cleaned_data['transcript'].apply(clean_text)

cleaned_data.head()

Unnamed: 0,channel_id,video_id,datetime,title,views,likes,comments,transcript,transcript_length
0,UCgUueMmSpcl-aCTt5CuCKQw,wzrIKGcOlsU,2025-01-13 17:00:24+00:00,Rory McIlroy has another gear.,3142,304,16,I've never seen you go after the ball like thi...,246
1,UCgUueMmSpcl-aCTt5CuCKQw,Pz42jFngEzM,2025-01-13 03:25:25+00:00,Thank you. 1 Million,59474,6757,428,[Music] so sick let's go [Music],32
2,UCgUueMmSpcl-aCTt5CuCKQw,fSHh01YT0-Q,2025-01-07 18:50:32+00:00,Tiger Woods hits the ball off the heel.,64116,2055,30,over the course of my career I've always hit t...,483
3,UCgUueMmSpcl-aCTt5CuCKQw,erzLT7fy2r0,2025-01-07 17:56:07+00:00,Tiger Woods liked my golf swing!,122584,4486,76,what's wrong with that yeah that came off you ...,208
4,UCgUueMmSpcl-aCTt5CuCKQw,3O08SnyZ88U,2025-01-07 17:07:04+00:00,Tiger Woods teaches me how to hit it straight!,555884,18286,142,what did you do in your career when you had a ...,646


## Final Data Validation

In [9]:
# Create Data Context.
context = gx.get_context()

# Create Data Source, Data Asset, Batch Definition, and Batch.
data_source = context.data_sources.add_pandas("pandas")
data_asset = data_source.add_dataframe_asset(name="Youtube video data")
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")
batch = batch_definition.get_batch(batch_parameters={"dataframe": cleaned_data})

In [10]:
# Create an Expectation Suite
suite = gx.ExpectationSuite(name="Youtube video data expectations")

# Add the Expectation Suite to the Data Context
suite = context.suites.add(suite)

# Validate columns exist
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='channel_id'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='video_id'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='datetime'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='title'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='views'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='likes'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='comments'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='transcript'))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column='transcript_length'))

# Validate data types
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='channel_id', type_="object"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='video_id', type_="object"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='datetime', type_="Timestamp"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='title', type_="object"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='views', type_="int64"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='likes', type_="int64"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='comments', type_="int64"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='transcript', type_="object"
    ))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column='transcript_length', type_="int64"
    ))

# Validate no empty values exist
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='channel_id'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='video_id'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='datetime'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='title'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='views'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='likes'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='comments'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='transcript'))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column='transcript_length'))

# Validate results
validation_results = batch.validate(suite)
print(validation_results.success)

Calculating Metrics:   0%|          | 0/49 [00:00<?, ?it/s]

True


## Data Versioning & Storage

In [11]:
# Ensure the directory exists
os.makedirs('data', exist_ok=True)

# Write the cleaned data to a parquet file
cleaned_data.to_parquet('data/youtube_video_data.parquet', index=False)

Initialize dvc

```zsh
$ dvc init
```

A few internal files are created that should be added to Git. So, as long as you are using Git for this project you can:

```zsh
git status
Changes to be committed:
        new file:   .dvc/.gitignore
        new file:   .dvc/config
        ...
git commit -m "Initialize DVC"
```

Next, you need to use `dvc add` to start tracking the dataset file. 

```zsh
git add DataOps/data/youtube_video_data.parquet.dvc DataOps/data/.gitignore
```

Next, run the following commands to track and tag the dataset changes in Git.

```zsh
git commit -m 'Initial processed Youtube data'
git tag -a "v1.0" -m "Youtube data v1.0" 
```



## Computing Environment

In [12]:
import sys

print(f'Python version: {sys.version}', end='\n\n')

with open('dataops-requirements.txt', 'r') as file:
    for line in file:
        print(line.strip())


Python version: 3.12.7 | packaged by Anaconda, Inc. | (main, Oct  4 2024, 08:28:27) [Clang 14.0.6 ]

dvc==3.59.0
great_expectations==1.3.1
jupyterlab==4.1.6
matplotlib==3.8.0
numpy==1.26.4
pandas<=2.2
python-dotenv==0.21.0
tqdm==4.63.0
youtube_transcript_api==0.6.2


error uploading: HTTPSConnectionPool(host='posthog.greatexpectations.io', port=443): Max retries exceeded with url: /batch/ (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1000)')))
