# Building a Data Pipeline in Python

The goal of this project is to load in data from a YouTube channel API and extract useful data in a dataframe format, then upload that to an AWS database. This version contains my entire thought process, for a version with only code see: 

In [7]:
import requests 
import time
import pandas as pd
from dotenv import load_dotenv
import os
import psycopg2 as ps 
import html

In [8]:
def configure():
    load_dotenv() #securely loading in my credentials from .env

For this project I will be looking at the popular science channel, Kurzgesagt. In order to find the channel ID, we obtain it from the source code on the YouTube channel's homepage. We also need the base url from which we will form the root of our api, this can be found in the documentation: https://developers.google.com/youtube/v3/docs/search/list

In [9]:
#key and ID, you will want to replace the API key with your own
configure()
API_KEY = os.getenv("API_KEY")
CHANNEL_ID = "UCsXVk37bltHxD1rDPwtNM8Q"

## 1. Initial Exploration

First, I will craft an API from the base URL and the parameters found in the documentation.

In [99]:
url = f"https://www.googleapis.com/youtube/v3/search?key={API_KEY}&channelId={CHANNEL_ID}&part=snippet,id&order=date&maxResults=2000"

video_info = requests.get(url).json()

video_info

{'kind': 'youtube#searchListResponse',
 'etag': 'XlOhiDG6Wfs4prkkDSeARcXxJ2M',
 'nextPageToken': 'CDIQAA',
 'regionCode': 'US',
 'pageInfo': {'totalResults': 206, 'resultsPerPage': 50},
 'items': [{'kind': 'youtube#searchResult',
   'etag': 'Fmk-JUAYW9KB-NAJIx6mh6rRURI',
   'id': {'kind': 'youtube#video', 'videoId': 'LEENEFaVUzU'},
   'snippet': {'publishedAt': '2022-06-28T14:00:23Z',
    'channelId': 'UCsXVk37bltHxD1rDPwtNM8Q',
    'title': 'The Last Human – A Glimpse Into The Far Future',
    'description': 'Because of the potential size of the future, the most important thing about our actions today might be their impact on future ...',
    'thumbnails': {'default': {'url': 'https://i.ytimg.com/vi/LEENEFaVUzU/default.jpg',
      'width': 120,
      'height': 90},
     'medium': {'url': 'https://i.ytimg.com/vi/LEENEFaVUzU/mqdefault.jpg',
      'width': 320,
      'height': 180},
     'high': {'url': 'https://i.ytimg.com/vi/LEENEFaVUzU/hqdefault.jpg',
      'width': 480,
      'height

We will have a few options to pick from for our statistics, including like count, view count, comment count, and favorite count. Favorite count is always zero, so we will leave it out. 

In [88]:
video_df = pd.DataFrame(columns = [ 'vid_id', 'vid_title', 'upload_date', 'view_count', 
                                    'like_count', 'comment_count'])

for vid in video_info['items']:
    if vid['id']['kind'] == 'youtube#video':
        vid_id = vid['id']['videoId']
        vid_title = vid['snippet']['title']
        upload_date = vid['snippet']['publishedAt']
        upload_time = str(upload_date).split("T")[1]
        upload_date = str(upload_date).split("T")[0]
        
        #obtaining stats using video id
        
        vid_url = "https://www.googleapis.com/youtube/v3/videos?key="+API_KEY+"&part=statistics&id="+vid_id
        video_info_vid = requests.get(vid_url).json()
        
        view_count = video_info_vid['items'][0]['statistics']['viewCount']
        like_count = video_info_vid['items'][0]['statistics']['likeCount']
        comment_count = video_info_vid['items'][0]['statistics']['commentCount']
        d = {'vid_id':[vid_id], 'vid_title':[vid_title], 'upload_date':[upload_date], 
             'view_count':[view_count], 'like_count':[like_count], 'comment_count':[comment_count]}
        video_df = pd.concat([video_df, pd.DataFrame(data = d)])

## 2. Cleaning and Optimizing Code

This has only collected videos from a single page, we want to loop through all page tokens. Also, it would be better to collect this loop into a function that obtains this same data.

In [10]:
def get_youtube_data(API_KEY, CHANNEL_ID):
    page = ""
    vid_df = pd.DataFrame(columns=["vid_id","vid_title","upload_time","upload_date","view_count","like_count","comment_count"]) 
    
    while True:
        url = "https://www.googleapis.com/youtube/v3/search?key="+API_KEY+"&channelId="+CHANNEL_ID+"&order=date&maxResults=2000&part=snippet,id&"+page

        video_info = requests.get(url).json()
        time.sleep(1) #waits for one second
        for video in video_info['items']:
            if video['id']['kind'] == "youtube#video":
                vid_id = video['id']['videoId']
                vid_title = video['snippet']['title']
                upload_date = video['snippet']['publishedAt']
                upload_time = str(upload_date).split("T")[1]
                upload_time = upload_time.replace("Z","")
                upload_date = str(upload_date).split("T")[0]
                
                #making a separate api call to pull the video stats
                url_vid_stats = "https://www.googleapis.com/youtube/v3/videos?id="+vid_id+"&part=statistics&key="+API_KEY
                vid_stats = requests.get(url_vid_stats).json()
                
                view_count = vid_stats['items'][0]['statistics']['viewCount']
                like_count = vid_stats['items'][0]['statistics']['likeCount']
                comment_count = vid_stats['items'][0]['statistics']['commentCount']
                
                #concatenating into the dataframe
                d = {'vid_id':[vid_id], 'vid_title':[vid_title], 'upload_time':[upload_time],
                     'upload_date':[upload_date], 'view_count':[view_count], 
                     'like_count':[like_count], 'comment_count':[comment_count]}
                vid_df = pd.concat([vid_df, pd.DataFrame(data = d)], ignore_index = True)
                
                
        try:
            if video_info['nextPageToken'] != None: 
                page = "pageToken=" + video_info['nextPageToken'] # causes loop to end when we reach final page

        except:
            break
        
        #translating html codes in title names to their corresponding symbols
    for i in range(len(vid_df)):
        vid_df.vid_title[i] = html.unescape(vid_df.vid_title[i])
    vid_df['upload_date'] = pd.to_datetime(vid_df['upload_date'])

    return vid_df

In [128]:
video_df = get_youtube_data(API_KEY, CHANNEL_ID)

Now, instead of a series of for loops, we have a single function which allows for us to pull this data from any channel that we have the channel ID for. 

## 3. Performing NLP Sentiment Analysis

For NLP, we have several options. NLTK and TextBlob are rules-based, where Flair is an embedding-based model. Flair offers higher accuracy at the cost of performance. Considering we are only running our NLP on titles, this performance hit should not be an issue. The flair package comes with two pre-built models, one for sentiment analysis and one for offensive language detection. I will make use of the sentiment analysis model here. 

In [69]:
from flair.models import TextClassifier
from flair.data import Sentence

Running a quick test below, we can see that the two sentences are correctly classified as positive and negative, with a level of confidence for each. 

In [123]:
classifier = TextClassifier.load("en-sentiment")
pos_sentence = Sentence("I really like Flair!")
neg_sentence = Sentence("Flair is bad!")
classifier.predict(pos_sentence)
classifier.predict(neg_sentence)
print(pos_sentence.labels, neg_sentence.labels)

2022-07-06 16:48:06,645 loading file /Users/drew/.flair/models/sentiment-en-mix-distillbert_4.pt
['Sentence: "I really like Flair !"'/'POSITIVE' (0.9991)] ['Sentence: "Flair is bad !"'/'NEGATIVE' (0.997)]


Some of these titles are more neutral than either positive or negatively worded, to reflect this I am setting a cutoff where the model needs to be at least 75% sure to classify as positive or negative, and otherwise will classify the title as "NEURTRAL". 

In [121]:
def add_sentiment(video_df):
    title_sentiment = []

    for i,vid in video_df.iterrows():
        title = Sentence(vid['vid_title'])
        classifier.predict(title)
        for label in title.labels:
            if label.score <= .75:
                title_sentiment.append("NEUTRAL")
            else:
                title_sentiment.append(label.value)

    video_df["title_sentiment"] = title_sentiment
        


In [129]:
add_sentiment(video_df)

In [130]:
pd.set_option("display.max.rows", 300)
video_df

Unnamed: 0,vid_id,vid_title,upload_time,upload_date,view_count,like_count,comment_count,title_sentiment
0,LEENEFaVUzU,The Last Human – A Glimpse Into The Far Future,14:00:23,2022-06-28,4610766,327083,15615,POSITIVE
1,75d_29QWELk,Change Your Life – One Tiny Step at a Time,14:00:05,2022-06-07,4804185,352943,10226,POSITIVE
2,Pj-h6MEgE7I,You Are Not Where You Think You Are,13:59:44,2022-05-17,5891319,326020,13903,NEUTRAL
3,7OPg-ksxZ4Y,The Most Horrible Parasite: Brain Eating Amoeba,13:59:29,2022-05-03,5285961,311886,15902,NEGATIVE
4,LxgMdjyw8uw,We WILL Fix Climate Change!,13:59:18,2022-04-05,7997748,548150,38513,POSITIVE
5,KRvv0QdruMQ,Are There Lost Alien Civilizations in Our Past?,14:59:23,2022-03-01,9133465,389262,16463,NEUTRAL
6,lheapd7bgLA,What Happens if the Moon Crashes into Earth?,14:59:49,2022-02-08,12012169,444148,25131,NEGATIVE
7,xAUJYP8tnRE,Why We Should NOT Look For Aliens - The Dark F...,15:00:03,2021-12-14,10999445,551581,28343,NEGATIVE
8,XFqn3uy238E,...And We'll Do it Again,14:59:44,2021-12-07,9815397,626959,24865,POSITIVE
9,F1Hq8eVOMHs,Is Meat Really that Bad?,15:01:34,2021-11-30,6607490,369291,43106,NEGATIVE


There are certainly some flaws here, a video titled "the last light before eternal darkness - white dwarfs & black dwarfs" was classified as "positive", for example, where this language is more ominous. A way to improve this would be to train our own model using titles across many science YouTube channels - which may be a good expansion to this project in the future.

## 4. Porting to AWS

Next I will export this pandas dataframe to AWS, first loading in credentials from the .env file on my system, then connecting to the database.

In [131]:
configure()
ENDPOINT=os.getenv("ENDPOINT")
PORT=os.getenv("PORT")
DB_NAME=os.getenv("DB_NAME")
USERNAME=os.getenv("USERNAME")
PASSWORD=os.getenv("PASSWORD")

In [132]:
def db_connect(host, database, user, password, port):
    try:
        connection = ps.connect(host=host, database=database, user=user, password=password, port=port)

    except ps.OperationalError as e:
        raise e
    else:
        print('Connected!')
        return connection

In [133]:
connection = db_connect(host = ENDPOINT,database = DB_NAME,
                        user = USERNAME,password = PASSWORD,
                        port = PORT)

Connected!


In [134]:
def initialize_vid_table(curs):
    sql_create_df = ("""CREATE TABLE IF NOT EXISTS video_data (
                vid_id VARCHAR(255) PRIMARY KEY,
                vid_title VARCHAR(255) NOT NULL,
                upload_time VARCHAR(255) NOT NULL,
                upload_date VARCHAR(255) NOT NULL,
                view_count INTEGER NOT NULL,
                like_count INTEGER NOT NULL,
                comment_count INTEGER NOT NULL,
                title_sentiment VARCHAR(255) NOT NULL
            )""")
    curs.execute(sql_create_df)
    connection.commit()
#would ideally import upload time and date as time and date objects, 
#but was causing errors

In [135]:
curs = connection.cursor()
connection.commit()

Now we create a table on the AWS database with SQL which has the same column names as our pandas dataframe. After running the following line, I will check the backend in my database management software by running "SELECT * FROM video_data" (it worked). 

In [136]:
initialize_vid_table(curs)

Now to write the code that will add a video to the SQL table if they are not there currently, or update them if they are already present in the table. One thought is that if I write the loop so that it inserts the new videos as it finds them, it will also needlessly update those new videos, adding to the amount of time for the code to execute. This is negligible with only a few new videos, but if we were pulling from a large number of channels and updating many videos it could add to processing time substantially. Therefore I will store the new rows in a separate dataframe, and insert them all at once at the end.

In [137]:
# rough outline of what I want to code:

# for row in video_df:
#     if row in sql_table:
#         update(row)
#     else:
#         df = pd.concat(df,row)
# insert(df)
        
def vid_in_table(curs, vid_id):
    sql_query = ("""SELECT vid_id FROM video_data WHERE vid_id = %s""")
    curs.execute(sql_query,(vid_id,))
    
    return curs.fetchone() is not None
    
def update_vid(curs, vid_id, vid_title, view_count, like_count, comment_count, title_sentiment):
    sql_query = ("""UPDATE video_data
                    SET vid_title = %s,
                        view_count = %s,
                        like_count = %s,
                        comment_count = %s,
                        title_sentiment = %s,
                    WHERE vid_id = %s;""")
    update_vars = (vid_title, view_count, like_count, comment_count) #tuple of vars I want to update
    curs.execute(sql_query, update_vars)
    
def insert_vids(curs, vid_id, vid_title, upload_time, upload_date, view_count, like_count, comment_count, title_sentiment):
    sql_query = ("""INSERT INTO video_data (
                        vid_id, vid_title, upload_time,
                        upload_date, view_count, like_count, 
                        comment_count, title_sentiment)
                    VALUES(%s,%s,%s,%s,%s,%s,%s,%s);""")
    insert_vars = (vid_id, vid_title, upload_time, upload_date, view_count, like_count, comment_count, title_sentiment)
    curs.execute(sql_query, insert_vars)
        

Now that we have the functions we need in order to build our loop, I will pull it all together into one final function which will port the dataframe into the database. 

In [138]:
def df_to_db(curs, video_df):
    insert_df = pd.DataFrame(columns=["vid_id","vid_title","upload_time",
                                      "upload_date","view_count","like_count",
                                      "comment_count", "title_sentiment"]) 
    for i,vid in video_df.iterrows():
        if vid_in_table(curs, vid["vid_id"]):
            update_vid(curs, vid["vid_id"], vid["vid_title"], vid["view_count"],
                       vid["like_count"], vid["comment_count"], vid["title_sentiment"])
        else:
            temp = pd.DataFrame(data = {"vid_id":[vid["vid_id"]],"vid_title":[vid["vid_title"]],
                                        "upload_time":[vid["upload_time"]],"upload_date":[vid["upload_date"]],
                                        "view_count":[vid["view_count"]],"like_count":[vid["like_count"]],
                                        "comment_count":[vid["comment_count"]],"title_sentiment":[vid["title_sentiment"]]})
            insert_df = pd.concat([insert_df, temp], ignore_index = True)
    
    for i,vid in insert_df.iterrows():
        insert_vids(curs, vid["vid_id"], vid["vid_title"], 
                    vid["upload_time"], vid["upload_date"], 
                    vid["view_count"], vid["like_count"], 
                    vid["comment_count"], vid["title_sentiment"])
    connection.commit()
        

And with that, I have a function (df_to_db) which will take our pandas dataframe, go line by line through it, and either update values if the video is already in the database or adds a new video if it is not yet in the database.

In [139]:
df_to_db(curs, video_df)

Now all of the data taken via the YouTube API and the title classifications from the sentiment analysis have been imported into an AWS postgres database, and this pipeline is complete. Some next steps would be:

1. Generating visualizations of this data in R, Python, or Tableau
2. Statistical analysis of sentiment analysis and video performance
3. Further improving the sentiment analysis via additional training
4. Importing and performing sentiment analysis on additional data - such as video transcripts
5. Expansion of dataset to additional channels and types of videos

With additional development, starting by performing sentiment analysis across many channels' titles and transcripts, I could transform this from the analysis of a single science YouTube channel into a tool which would help creators to better determine the optimal tone and title for videos depending on the category of channel those videos are a part of. 