# Unveiling Insights into the Hong Kong YouTube Scene: A Data-Driven Exploration of Top Channels and User Sentiment

## by Patty Lau

### Description: This is an exploratory data analysis project focused on the most-subscribed YouTube video channels in Hong Kong. A range of tools and techniques were utilised to build a data pipeline and perform sentiment analysis on video comments. Data visualizations were also created to enhance insights and communicate findings.

### Tech Stack: Python, Jupyter Notebook, MongoDB, Google Cloud Platform Compute Engine, Big Query, Natural Language API, Apache Spark, Microsoft Power BI, Pandas


In [3]:
from pymongo import MongoClient
import os
from dotenv import load_dotenv
import datetime as dt
import isodate


##### Initial Setup


In [4]:
load_dotenv()
# Connect to locatlhost and port 27017
client = MongoClient(os.getenv('MONGODB_URI') or 'mongodb://localhost:27017')

# Choose the MongoDB database
db = client.youtubedataapi0416

# Connect to the posgresql data warehouse
POSTGRES_DB = os.getenv('POSTGRES_DB')
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")


##### Collect data from YouTube API


In [5]:
# -*- coding: utf-8 -*-

# Sample Python code for youtube.channels.list
# See instructions for running these code samples locally:
# https://developers.google.com/explorer-help/code-samples#python

from googleapiclient.discovery import build
import pandas as pd

api_service_name = "youtube"
api_version = "v3"

api_key = os.getenv('YOUTUBE_API_KEY')

# Get credentials and create an API client
youtube = build(
    api_service_name, api_version, developerKey=api_key)


In [47]:
# search for channels

# Define the search parameters
search_query = ["Emi Wong", "Coffee Lam"]
request = youtube.search().list(
    q=",".join(search_query),
    type="channel",
    part="id,snippet",
    maxResults=10
)

# Execute the search and print the results
next_page_token = ''
while True:
    response = request.execute()
    for channel in response['items']:
        channel_id = channel['id']['channelId']
        channel_name = channel['snippet']['title']
        print(f"{channel_name} (ID: {channel_id})")

    # Check if there are more pages of results
    if 'nextPageToken' in response:
        next_page_token = response['nextPageToken']
        request = youtube.search().list(
            q=",".join(search_query),
            type="channel",
            part="id,snippet",
            maxResults=10,
            pageToken=next_page_token
        )
    else:
        break


Coffee林芊妤 (ID: UCxCZqbizSsnntlz6w0fN8hA)
Fanny Wang - Topic (ID: UCTEy93F0GaoP_ycEIgqWjbg)
Stephy Tang - Topic (ID: UCtiL38iHIIUi10k_zbpqCRQ)


In [31]:

# function to get channel stats

def get_channel_stats(youtube, channel_ids):
    all_channel_stats = []  # list to store all channel stats
    request = youtube.channels().list(
        part="snippet,contentDetails,statistics",
        id=','.join(channel_ids)
    )
    response = request.execute()

    # loop through each channel id
    for i in range(len(response['items'])):
        data = dict(channelId=response['items'][i]['id'],
                    ChannelTitle=response['items'][i]['snippet']['title'],
                    subscribers=int(
                        response['items'][i]['statistics']['subscriberCount']),
                    totalViews=int(response['items'][i]
                                   ['statistics']['viewCount']),
                    totalVideos=int(response['items']
                                    [i]['statistics']['videoCount']),
                    channelPublishedAt=response['items'][i]['snippet']['publishedAt'],
                    playlistId=response['items'][i]['contentDetails']['relatedPlaylists']['uploads'],
                    DataRetrievedAt=dt.datetime.now().isoformat())
        all_channel_stats.append(data)

        # Write data to MongoDB
        # db.top5hkchannelsinfo.insert_one(data)
    return all_channel_stats  # return all channel stats for each channel ID


In [32]:
# Function to get videoIds from playlistId

def get_video_ids(youtube, playlist_id):

    request = youtube.playlistItems().list(
        part='contentDetails',
        playlistId=playlist_id,
        maxResults=50)
    response = request.execute()

    video_ids = []

    for i in range(len(response['items'])):
        video_ids.append(response['items'][i]['contentDetails']['videoId'])

    next_page_token = response.get('nextPageToken')
    havePages = True

    while havePages:
        if next_page_token is None:
            havePages = False
        else:
            request = youtube.playlistItems().list(
                part='contentDetails',
                playlistId=playlist_id,
                maxResults=50,
                pageToken=next_page_token)
            response = request.execute()

            for i in range(len(response['items'])):
                video_ids.append(response['items'][i]
                                 ['contentDetails']['videoId'])

            next_page_token = response.get('nextPageToken')
            # Write data to MongoDB
    # db.allvideoids.insert_one(dict(playlistId=playlist_id,videoId=video_ids))

    return video_ids  # return list of videoIds


In [33]:
# Function to get video details
def get_video_details(youtube, video_ids):

    all_video_info = []

    for i in range(0, len(video_ids), 50):
        request = youtube.videos().list(
            part="snippet,contentDetails,statistics",
            id=','.join(video_ids[i:i+50])
        )
        response = request.execute()

        for video in response['items']:
            if 'tags' not in video['snippet'].keys():
                video['snippet']['tags'] = None
            if 'likeCount' not in video['statistics'].keys():
                video['statistics']['likeCount'] = 0
            if 'commentCount' not in video['statistics'].keys():
                video['statistics']['commentCount'] = 0
            # insert video id
            video_info = dict(VideoId=video['id'],
                              ChannelTitle=video['snippet']['channelTitle'],
                              VideoTitle=video['snippet']['title'],
                              Description=video['snippet']['description'],
                              Tags=video['snippet']['tags'],
                              Published_date=video['snippet']['publishedAt'],
                              Views=int(video['statistics']['viewCount']),
                              Likes=int(video['statistics']['likeCount']),
                              Comments=int(
                                  video['statistics']['commentCount']),
                              Duration=isodate.parse_duration(
                                  video['contentDetails']['duration']).total_seconds(),
                              Definition=video['contentDetails']['definition'],
                              Caption=video['contentDetails']['caption'],
                              DataRetrievedAt=dt.datetime.now().isoformat()
                              )
            all_video_info.append(video_info)
            # db.allvideoinfo.insert_one(video_info)
    return all_video_info


In [7]:
# Function to get video comments
def get_comments_in_videos(youtube, video_ids):

    all_comments_info = []
    all_disabled_comments_info = []

    for video_id in video_ids:
        try:
            request = youtube.commentThreads().list(
                part="snippet,replies",
                videoId=video_id,
                maxResults=100  # set max number of comments to retrieve
            )
            response = request.execute()

            comments_info = dict(videoId=video_id,
                                 commentText=[comment['snippet']['topLevelComment']
                                              ['snippet']['textOriginal'] for comment in response['items']],
                                 commentPublishedAt=[comment['snippet']['topLevelComment']['snippet']
                                                     ['publishedAt'] for comment in response['items']],
                                 likeCount=[int(comment['snippet']['topLevelComment']['snippet']
                                                    ['likeCount']) for comment in response['items']],
                                 DataRetrievedAt=dt.datetime.now().isoformat()
                                 )
            all_comments_info.append(comments_info)
            # Write data to MongoDB
            # db.allcommentsinfo.insert_one(comments_info)

        except Exception as e:
            # When error occurs - most likely because comments are disabled on a video
            print(e)
            print('Could not get comments for video ' + video_id)

            disabled_comments_info = dict(videoId=video_id, commentText="disabled comments",
                                          commentPublishedAt="disabled comments",
                                          likeCount="disabled comments",
                                          DataRetrievedAt=dt.datetime.now().isoformat()
                                          )
            all_disabled_comments_info.append(disabled_comments_info)
            # db.disabledcommentsinfo.insert_one(disabled_comments_info)
    return all_comments_info, all_disabled_comments_info


#### Insert raw data collected to mongoDB and display temperorary dataframe with Pandas


##### Get channel statistics


In [34]:
channel_ids = ['UCvGEK5_U-kLgO6-AMDPeTUQ',  # Emi Wong
               'UCxCZqbizSsnntlz6w0fN8hA',  # Coffee Lam
               'UCXnWjmQ8BDE0sDIeZLK5yJg',  # 點 Cook Guide
               'UC4nsi0oM9WBNFv1RdLh3c2g',  # JASON816
               'UCDpK1rg5I9Zc3ToY13vbR3w'  # 笑波子
               ]


In [35]:
channel_data = get_channel_stats(youtube, channel_ids)


In [36]:
pd.DataFrame(channel_data)


Unnamed: 0,channelId,ChannelTitle,subscribers,totalViews,totalVideos,channelPublishedAt,playlistId,DataRetrievedAt
0,UCDpK1rg5I9Zc3ToY13vbR3w,笑波子,936000,1051897493,4354,2006-09-09T19:59:59Z,UUDpK1rg5I9Zc3ToY13vbR3w,2023-04-16T09:38:02.175488
1,UC4nsi0oM9WBNFv1RdLh3c2g,JASON,1040000,521248369,2955,2013-06-16T13:50:59Z,UU4nsi0oM9WBNFv1RdLh3c2g,2023-04-16T09:38:02.175504
2,UCXnWjmQ8BDE0sDIeZLK5yJg,點 Cook Guide,1110000,202603292,1356,2014-02-07T15:44:04Z,UUXnWjmQ8BDE0sDIeZLK5yJg,2023-04-16T09:38:02.175526
3,UCvGEK5_U-kLgO6-AMDPeTUQ,emi wong,5960000,787128269,442,2014-11-02T14:43:34Z,UUvGEK5_U-kLgO6-AMDPeTUQ,2023-04-16T09:38:02.175529
4,UCxCZqbizSsnntlz6w0fN8hA,Coffee林芊妤,1700000,261803319,353,2015-06-02T07:09:15Z,UUxCZqbizSsnntlz6w0fN8hA,2023-04-16T09:38:02.175533


In [37]:
db.top5hkchannelsinfo.insert_many(channel_data)


<pymongo.results.InsertManyResult at 0x7f1488c3ac20>

##### Get video ids from playlist id


In [54]:
playlist_id = "UUxCZqbizSsnntlz6w0fN8hA"  # emi
video_ids = get_video_ids(youtube, playlist_id)


In [55]:
db.allvideoids.insert_one(dict(playlistId=playlist_id, videoId=video_ids))


<pymongo.results.InsertOneResult at 0x7f1424b1a260>

##### Get video details from video ids


In [56]:
get_video_details_result = get_video_details(youtube, video_ids)
pd.DataFrame(get_video_details_result)


Unnamed: 0,VideoId,ChannelTitle,VideoTitle,Description,Tags,Published_date,Views,Likes,Comments,Duration,Definition,Caption,DataRetrievedAt
0,WxTxeyqRM4E,Coffee林芊妤,趁着兒童節同大家宣佈CoffeeSweat代言人🌟陳伯個fd～翟伯😎!! #coffeesw...,,,2023-04-04T09:18:00Z,10622,207,5,13.0,hd,false,2023-04-16T09:42:16.977984
1,ibZ6t_lTTIA,Coffee林芊妤,28分鐘 女性日常養生瑜伽｜由內到外保養｜全身保健拉筋♥️愛自己♥️｜孕婦都能做,On coffee lam\nWrap around long sleeve- white\...,"[coffee, coffee lam, coffeelam, 林芊妤, coffeeyog...",2023-04-01T07:50:53Z,55057,1937,123,1762.0,hd,false,2023-04-16T09:42:16.978017
2,1AqSRo535vs,Coffee林芊妤,22分鐘 極有效全身局部肌肉訓練｜用body weight增肌｜14天肌力挑戰🔥,On Coffee Lam\nChic Sports Bra（Mercury Grey）\n...,"[coffee, coffee lam, coffeelam, 林芊妤, coffeeyog...",2023-03-20T04:51:15Z,95416,2072,156,1470.0,hd,false,2023-04-16T09:42:16.978034
3,8ZLyuY01Dqo,Coffee林芊妤,每天都要穿着腰封去為腰部塑形🔥,,,2023-03-18T04:44:13Z,15493,206,7,17.0,hd,false,2023-04-16T09:42:16.978050
4,pmOdWGRcNvc,Coffee林芊妤,CoffeeSweat 2023 Spring Collection♥️,www.CoffeeSweat.com,,2023-03-13T08:52:26Z,8326,231,5,61.0,hd,false,2023-04-16T09:42:16.978065
...,...,...,...,...,...,...,...,...,...,...,...,...,...
348,11tdiQZb-BM,Coffee林芊妤,COFFEE YOGA EP4. 升CUP瑜珈！幫你減肚腩！,一連兩集升級瑜珈篇，第一集先教大家減！肚！腩 ！先有兩個動作改善寒背問題，再加兩個動作同幫你...,"[coffee, coffee lam, coffeelam, 林芊妤, coffeeyog...",2015-07-26T03:08:00Z,146074,1330,33,200.0,hd,false,2023-04-16T09:42:18.180162
349,SUy3-htFkys,Coffee林芊妤,COFFEE YOGA EP3. 瑜珈男女大不同,男仔女仔做瑜珈動作各有各困難，今次就同大家簡單示範下啦！\n想學更多，記得SUBSCRIBE...,"[coffee, coffee lam, coffeelam, 林芊妤, coffeeyog...",2015-07-18T05:30:01Z,171174,594,35,148.0,hd,false,2023-04-16T09:42:18.180173
350,uFRn2Jg1vbk,Coffee林芊妤,COFFEE YOGA EP2. 私影瑜珈 (𡃁模MODELS必學),想擺靚pose影相，SHOWSHOW靚身材？\n今次就教各位美女們兩招動作旁身啦\n想學更多...,"[私影, 𡃁模, Yoga (Sport), Model (Profession), pos...",2015-07-10T12:32:18Z,67662,383,22,140.0,hd,false,2023-04-16T09:42:18.260663
351,SCUXhKmOulc,Coffee林芊妤,COFFEE YOGA SPECIAL 柔軟咖啡 X 剛硬男仔（男女瑜珈沙灘篇）,今次去到沙灘遇上硬崩崩嘅伍仔\n於是邀請佢同我一齊做瑜珈\n希望向大家示範男女都啱做嘅瑜珈動...,"[COFFEE, Yoga (Sport), Hong Kong (Country), 瑜珈...",2015-07-05T03:00:01Z,50866,345,36,205.0,hd,false,2023-04-16T09:42:18.260689


In [57]:
db.allvideodetails.insert_many(get_video_details_result)


<pymongo.results.InsertManyResult at 0x7f1425974eb0>

##### remove duplicate video details in the sentiment collection


In [None]:
# The below code is to remove video details in the sentiment collection

from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['youtubedataapi0412']
collection = db['allcomments_sample_sentiment_0414']

# Get a list of unique comments in the collection
unique_comments = collection.distinct('Comment')

# Iterate through the collection and remove any documents with duplicate comments
for comment in unique_comments:
    # Count the number of documents with this comment
    count = collection.count_documents({'Comment': comment})

    # If there is more than one document, delete all but one
    if count > 1:
        # Get all documents with this comment
        documents = collection.find({'Comment': comment})

        # Keep the first document and delete the rest
        first_document = True
        for document in documents:
            if first_document:
                first_document = False
            else:
                collection.delete_one({'_id': document['_id']})


##### Get video comments from video ids


In [31]:
all_comments_in_videos = get_comments_in_videos(youtube, video_ids)
# pd.DataFrame(all_comments_in_videos[1])  # disabled comments


<HttpError 403 when requesting https://youtube.googleapis.com/youtube/v3/commentThreads?part=snippet%2Creplies&videoId=NJZ95sZlXL4&maxResults=100&key=AIzaSyColQwCiZxVE9zGJEcZ54SjVCT0tcqY1xE&alt=json returned "The video identified by the <code><a href="/youtube/v3/docs/commentThreads/list#videoId">videoId</a></code> parameter has disabled comments.">
Could not get comments for video NJZ95sZlXL4


In [32]:
pd.DataFrame(all_comments_in_videos[0])


Unnamed: 0,videoId,commentText,commentPublishedAt,likeCount,DataRetrievedAt
0,pgGoBihIUiU,"[Do you have exercise for foot sprain?, thanks...","[2023-04-15T09:53:26Z, 2023-04-15T09:37:06Z, 2...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",2023-04-15T11:16:34.802343
1,G0cBlx-Jfdo,"[Thank you emi. ❤, Recipe for the banana cake ...","[2023-04-14T00:01:09Z, 2023-04-12T01:12:11Z, 2...","[0, 1, 0, 5, 3, 3, 3, 7, 4, 3, 3, 4]",2023-04-15T11:16:35.048075
2,6sLXvyL-JEc,[this vlog was filmed more than a year ago😳 so...,"[2023-04-09T12:52:58Z, 2023-04-14T05:19:00Z, 2...","[56, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0,...",2023-04-15T11:16:35.342696
3,D8V_No7wxVQ,"[Fun video! Thanks for sharing 👌, in this peri...","[2023-04-12T16:22:46Z, 2023-04-08T15:42:04Z, 2...","[0, 1, 1, 0, 0, 1, 0, 0, 2, 0, 0, 0, 1, 1, 6, ...",2023-04-15T11:16:36.038890
4,MJapk3ZX5SE,[good job emi wong youre spanish sounds pretty...,"[2023-04-08T15:47:21Z, 2023-04-06T09:34:54Z, 2...","[1, 1, 0, 1, 0, 6, 1, 0, 0, 2, 1, 6, 1, 0, 16,...",2023-04-15T11:16:36.294262
...,...,...,...,...,...
437,WwqYFnBGjT8,[I also post what I eat everyday on my instagr...,"[2019-07-16T10:24:46Z, 2022-07-02T04:45:57Z, 2...","[14, 0, 1, 0, 0, 1, 0, 3, 0, 0, 0, 0, 0, 0, 6,...",2023-04-15T11:19:37.889696
438,DZ6voLaG9_I,[I also post what I eat everyday on my instagr...,"[2019-07-16T08:54:18Z, 2023-03-09T13:32:52Z, 2...","[255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0...",2023-04-15T11:19:38.265562
439,Fo470c993yA,[I also post what I eat everyday on my instagr...,"[2019-07-16T10:18:01Z, 2022-03-10T12:28:13Z, 2...","[18, 0, 0, 0, 3, 3, 0, 0, 5, 6, 1, 0, 1, 0, 5,...",2023-04-15T11:19:38.661458
440,p1DJhMy0yCs,[I also post what I eat everyday on my instagr...,"[2019-07-16T09:54:35Z, 2023-04-09T15:09:19Z, 2...","[87, 0, 1, 0, 0, 3, 0, 1, 1, 0, 0, 0, 3, 0, 1,...",2023-04-15T11:19:39.284293


In [33]:
db.allcommentsinfo2.insert_many(all_comments_in_videos[0])


<pymongo.results.InsertManyResult at 0x7f8a24c816f0>

##### Unwind comments collected in mongoDB


In [34]:
# unwind the arrays inside all comments info document

from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')

# access the database
db = client['youtubedataapi0415']

# access the collection
collection = db['allcommentsinfo2']

# aggregation pipeline
pipeline = [
    {
        '$unwind': {
            'path': '$commentText',
            'includeArrayIndex': 'arrayIndex'
        }
    },
    {
        '$project': {
            '_id': 0,
            'videoId': 1,
            'commentText': 1,
            'commentPublishedAt': {'$arrayElemAt': ['$commentPublishedAt', '$arrayIndex']},
            'likeCount': {'$arrayElemAt': ['$likeCount', '$arrayIndex']},
            'DataRetrievedAt': 1
        }
    },
    {
        '$out': 'allcommentsinfo_unwinded2'
    }
]

# execute aggregation pipeline
collection.aggregate(pipeline)


<pymongo.command_cursor.CommandCursor at 0x7f8a24cc3f40>

##### Creating small size test samples


In [None]:

# get the first 5 documents in allcommentsinfo for testing
import pymongo

# Connect to MongoDB
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client['youtubedataapi0412']

# Select the first 10 documents in the collection
pipeline = [
    {'$limit': 5},
]

cursor = db['allcommentsinfo'].aggregate(pipeline)

# Insert the selected documents into a new collection
new_collection = db['allcommentsinfo_sample2']
result = new_collection.insert_many(list(cursor))

print(result.inserted_ids)

# aggregation pipeline
pipeline = [
    {
        '$unwind': {
            'path': '$commentText',
            'includeArrayIndex': 'arrayIndex'
        }
    },
    {
        '$project': {
            '_id': 0,
            'videoId': 1,
            'commentText': 1,
            'commentPublishedAt': {'$arrayElemAt': ['$commentPublishedAt', '$arrayIndex']},
            'likeCount': {'$arrayElemAt': ['$likeCount', '$arrayIndex']},
            'DataRetrievedAt': 1
        }
    },
    {
        '$out': 'allcommentsinfo_aggregated_sample2'
    }
]

# execute aggregation pipeline
new_collection.aggregate(pipeline)


[ObjectId('64366db41663f97dab7cf6a2'), ObjectId('64366db41663f97dab7cf6a3'), ObjectId('64366db41663f97dab7cf6a4'), ObjectId('64366db41663f97dab7cf6a5'), ObjectId('64366db41663f97dab7cf6a6')]


<pymongo.command_cursor.CommandCursor at 0x7f00d1b7bd90>

## Google Cloud Natural Language Sentiment Analysis


##### Get sentiment analysis scores from comments collected in mongoDB using Google Cloud Natural Language API


In [None]:
import csv
import time
from pymongo import MongoClient
from google.cloud import language_v1

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['youtubedataapi0415']
collection = db['allcommentsinfo_unwinded2']

# Set up Google Cloud Natural Language API client
client = language_v1.LanguageServiceClient()

# Open CSV file for writing
with open('sentiment0415emi.csv', mode='w', newline='') as sentiment_file:
    sentiment_writer = csv.writer(sentiment_file)
    sentiment_writer.writerow(
        ['Comment', 'Language', 'LikeCount', 'CommentPublishedTime', 'Score', 'Magnitude'])

    # Create a new collection for sentiment analysis results
    sentiment_collection = db['allcommentsinfo_unwinded_sentiment2']

    # Iterate over documents in collection
    for document in collection.find():
        # Extract commentText array
        comment = document['commentText']
        likecount = document['likeCount']
        commentpublishedtime = document['commentPublishedAt']
        # videoId = document['videoId'] (0415 no videoId)

        # Check if the comment has already been analyzed
        existing_result = sentiment_collection.find_one({'Comment': comment})
        if existing_result:
            print(f"Comment '{comment}' already analyzed")

        try:
            # Call the analyze_sentiment method to detect the language and analyze the sentiment
            document = language_v1.Document(
                content=comment, type_=language_v1.Document.Type.PLAIN_TEXT)
            response = client.analyze_sentiment(
                request={'document': document, 'encoding_type': language_v1.EncodingType.UTF8})

            # Extract language and sentiment
            language = response.language
            sentiment = response.document_sentiment

            # Write result to CSV file
            sentiment_writer.writerow(
                [comment, language, likecount, commentpublishedtime, sentiment.score, sentiment.magnitude])

            # Save result to MongoDB collection
            # add videoId from allcommentsinfo unwinded collection
            sentiment_collection.insert_one({
                'Comment': comment,
                'Language': language,
                'LikeCount': likecount,
                'CommentPublishedTime': commentpublishedtime,
                'Score': sentiment.score,
                'Magnitude': sentiment.magnitude
            })

        except Exception as e:
            # Skip unsupported language comments
            if 'is not supported for document_sentiment analysis' in str(e):
                print(
                    f"Skipping comment '{comment}' due to unsupported language")
                continue

            # Handle other exceptions
            print(f"Error processing comment '{comment}': {e}")

        # Add a 0.05-second delay between each analysis to avoid hitting the rate limit
        time.sleep(0.05)


##### Check and Remove duplicate comments in the sentiment collection


In [30]:
# The below code is to remove duplicate comments in the sentiment collection

from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['youtubedataapi0412']
collection = db['allcomments_sample_sentiment_0414']

# Get a list of unique comments in the collection
unique_comments = collection.distinct('Comment')

# Iterate through the collection and remove any documents with duplicate comments
for comment in unique_comments:
    # Count the number of documents with this comment
    count = collection.count_documents({'Comment': comment})

    # If there is more than one document, delete all but one
    if count > 1:
        # Get all documents with this comment
        documents = collection.find({'Comment': comment})

        # Keep the first document and delete the rest
        first_document = True
        for document in documents:
            if first_document:
                first_document = False
            else:
                collection.delete_one({'_id': document['_id']})


# Pyspark


In [5]:
import findspark
print(findspark.init())


None


In [7]:

# setup pyspark
from pyspark.sql import SparkSession

packages = [
    "org.apache.hadoop:hadoop-aws:3.2.0",
    "org.apache.spark:spark-avro_2.12:2.4.4",
    "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1",
    "org.postgresql:postgresql:42.2.18",
    "com.google.cloud.spark:spark-3.1-bigquery:0.30.0"
]

spark = SparkSession.builder.appName("Transform youtube video data")\
    .master('spark://localhost:7077')\
    .config("spark.jars.packages", ",".join(packages))\
    .getOrCreate()


:: loading settings :: url = jar:file:/home/patty/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/patty/.ivy2/cache
The jars for the packages stored in: /home/patty/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
com.google.cloud.spark#spark-3.1-bigquery added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-360b06ed-8585-4a75-8844-128a45b1ce10;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
	found org.apache.spark#spark-avro_2.12;2.4.4 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.postgresql#postgresql

23/04/18 03:57:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Load data from mongoDB to spark


##### Write dataframe of top5hkchannelsinfo


In [7]:
df_top5hkchannelsinfo_0415 = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0416.top5hkchannelsinfo').load()
df_top5hkchannelsinfo_0415 = df_top5hkchannelsinfo_0415.drop('_id')
df_top5hkchannelsinfo_0415.dtypes
df_top5hkchannelsinfo_0415.printSchema()
df_top5hkchannelsinfo_0415.show()


                                                                                

root
 |-- ChannelTitle: string (nullable = true)
 |-- DataRetrievedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelPublishedAt: string (nullable = true)
 |-- playlistId: string (nullable = true)
 |-- subscribers: integer (nullable = true)
 |-- totalVideos: integer (nullable = true)
 |-- totalViews: integer (nullable = true)



[Stage 1:>                                                          (0 + 1) / 1]

+-------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+----------+
| ChannelTitle|     DataRetrievedAt|           channelId|  channelPublishedAt|          playlistId|subscribers|totalVideos|totalViews|
+-------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+----------+
|       笑波子|2023-04-16T09:38:...|UCDpK1rg5I9Zc3ToY...|2006-09-09T19:59:59Z|UUDpK1rg5I9Zc3ToY...|     936000|       4354|1051897493|
|        JASON|2023-04-16T09:38:...|UC4nsi0oM9WBNFv1R...|2013-06-16T13:50:59Z|UU4nsi0oM9WBNFv1R...|    1040000|       2955| 521248369|
|點 Cook Guide|2023-04-16T09:38:...|UCXnWjmQ8BDE0sDIe...|2014-02-07T15:44:04Z|UUXnWjmQ8BDE0sDIe...|    1110000|       1356| 202603292|
|     emi wong|2023-04-16T09:38:...|UCvGEK5_U-kLgO6-A...|2014-11-02T14:43:34Z|UUvGEK5_U-kLgO6-A...|    5960000|        442| 787128269|
| Coffee林芊妤|2023-04-16T09:38:...|UCxCZqbizSsnntlz6...|2015-

                                                                                

##### Write dataframe of allvideodetails


In [8]:
df_allvideodetails0418 = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0416.allvideodetails').load()

df_allvideodetails0418.createOrReplaceTempView('df_allvideodetails0418')

df_allvideodetails0418 = spark.sql("""
    SELECT
        EXTRACT(year from df_allvideodetails0418.Published_date) as year,
        EXTRACT(month from df_allvideodetails0418.Published_date) as month,
        EXTRACT(day from df_allvideodetails0418.Published_date) as day,
        EXTRACT(hour from df_allvideodetails0418.Published_date) as hour,
        date_format(df_allvideodetails0418.Published_date, 'EEEE') as weekday,
        *
    FROM df_allvideodetails0418
""")
df_allvideodetails0418 = df_allvideodetails0418.drop("_id")
df_allvideodetails0418.printSchema()
df_allvideodetails0418.show()


                                                                                

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- Caption: string (nullable = true)
 |-- ChannelTitle: string (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- DataRetrievedAt: string (nullable = true)
 |-- Definition: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Duration: double (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Published_date: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- VideoId: string (nullable = true)
 |-- VideoTitle: string (nullable = true)
 |-- Views: integer (nullable = true)



[Stage 1:>                                                          (0 + 1) / 1]

+----+-----+---+----+---------+-------+------------+--------+--------------------+----------+-------------------------------------+--------+-----+--------------------+-------------------------+-----------+-----------------------------------+------+
|year|month|day|hour|  weekday|Caption|ChannelTitle|Comments|     DataRetrievedAt|Definition|                          Description|Duration|Likes|      Published_date|                     Tags|    VideoId|                         VideoTitle| Views|
+----+-----+---+----+---------+-------+------------+--------+--------------------+----------+-------------------------------------+--------+-----+--------------------+-------------------------+-----------+-----------------------------------+------+
|2023|    4| 15|   9| Saturday|   true|      笑波子|      90|2023-04-16T09:38:...|        hd|    無人能抗拒這麼可愛的肉桂狗Café...|   566.0| 1342|2023-04-15T09:06:10Z|[笑波子, 波子, hk, vlo...|yOwSdw2Ep60|     【😍肉桂狗主題Café 】把肉桂...| 33366|
|2023|    4| 15|   3| Saturday|  fal

                                                                                

##### Calulating ratio of likes/comments/duration

In [22]:
from pyspark.sql.functions import size, array, when, col


# Use when() function to replace null values with empty arrays
df_allvideodetails0418 = df_allvideodetails0418.withColumn("Tags", when(
    df_allvideodetails0418.Tags.isNull(), array()).otherwise(df_allvideodetails0418.Tags))

# Add a new column with array length
df_allvideodetails0418 = df_allvideodetails0418.withColumn(
    "TagNum", size(df_allvideodetails0418.Tags))

# Replace -1 values with 0 in TagNum column
df_allvideodetails0418 = df_allvideodetails0418.withColumn("TagNum", when(
    df_allvideodetails0418.TagNum == -1, 0).otherwise(df_allvideodetails0418.TagNum))

# Add a new column 'likeRatio' with calculated values
df_allvideodetails0418 = df_allvideodetails0418.withColumn("likeRatio", col("Likes") / col("Views"))

# Add a new column 'commentRatio' with calculated values
df_allvideodetails0418 = df_allvideodetails0418.withColumn("commentRatio", col("Comments") / col("Views"))

# print the schema
df_allvideodetails0418.printSchema()

# Show the updated DataFrame
df_allvideodetails0418.show()


root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- Caption: string (nullable = true)
 |-- ChannelTitle: string (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- DataRetrievedAt: string (nullable = true)
 |-- Definition: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Duration: double (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Published_date: string (nullable = true)
 |-- Tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- VideoId: string (nullable = true)
 |-- VideoTitle: string (nullable = true)
 |-- Views: integer (nullable = true)
 |-- TagNum: integer (nullable = false)
 |-- likeRatio: double (nullable = true)
 |-- commentRatio: double (nullable = true)

+----+-----+---+----+---------+-------+------------+--------+--------------------+----------+-------

##### Write dataframe of allvideocomments without sentiment scores


In [35]:
df_allcommentsinfo_unwinded0415_coffee = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0415.allcommentsinfo_unwinded').load()

df_allcommentsinfo_unwinded0415_emi = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0415.allcommentsinfo_unwinded2').load()

df_allcommentsinfo_unwinded0415_coffee.createOrReplaceTempView(
    'df_allcommentsinfo_unwinded0415_coffee')
df_allcommentsinfo_unwinded0415_emi.createOrReplaceTempView(
    'df_allcommentsinfo_unwinded0415_emi')

df_allcommentsinfo_unwinded0415_coffee = spark.sql("""
    SELECT
        EXTRACT(year from df_allcommentsinfo_unwinded0415_coffee.commentPublishedAt) as year,
        EXTRACT(month from df_allcommentsinfo_unwinded0415_coffee.commentPublishedAt) as month,
        EXTRACT(day from df_allcommentsinfo_unwinded0415_coffee.commentPublishedAt) as day,
        EXTRACT(hour from df_allcommentsinfo_unwinded0415_coffee.commentPublishedAt ) as hour,
        date_format(df_allcommentsinfo_unwinded0415_coffee.commentPublishedAt, 'EEEE') as weekday,
        *
    FROM df_allcommentsinfo_unwinded0415_coffee
""")

df_allcommentsinfo_unwinded0415_emi = spark.sql("""
    SELECT
        EXTRACT(year from df_allcommentsinfo_unwinded0415_emi.commentPublishedAt) as year,
        EXTRACT(month from df_allcommentsinfo_unwinded0415_emi.commentPublishedAt) as month,
        EXTRACT(day from df_allcommentsinfo_unwinded0415_emi.commentPublishedAt) as day,
        EXTRACT(hour from df_allcommentsinfo_unwinded0415_emi.commentPublishedAt ) as hour,
        date_format(df_allcommentsinfo_unwinded0415_emi.commentPublishedAt, 'EEEE') as weekday,
        *
    FROM df_allcommentsinfo_unwinded0415_emi
""")


df_allcommentsinfo_unwinded0415_coffee.show()
df_allcommentsinfo_unwinded0415_emi.show()


+----+-----+---+----+---------+--------------------+--------------------+--------------------+-------------------------------------+---------+-----------+
|year|month|day|hour|  weekday|     DataRetrievedAt|                 _id|  commentPublishedAt|                          commentText|likeCount|    videoId|
+----+-----+---+----+---------+--------------------+--------------------+--------------------+-------------------------------------+---------+-----------+
|2023|    4|  7|   4|   Friday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-07T04:23:51Z|                           长大了许多|        0|WxTxeyqRM4E|
|2023|    4|  6|  23| Thursday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-06T23:17:43Z|                           So cute❤~~|        2|WxTxeyqRM4E|
|2023|    4|  5|   7|Wednesday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-05T07:42:41Z|                   最可愛的代言人😄🥰|        2|WxTxeyqRM4E|
|2023|    4|  4|  14|  Tuesday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-04T

In [36]:
# merge the dataframes using union
merged_df_allcommentsinfo_unwinded = df_allcommentsinfo_unwinded0415_coffee.union(
    df_allcommentsinfo_unwinded0415_emi)

# show the merged dataframe
merged_df_allcommentsinfo_unwinded.show()


+----+-----+---+----+---------+--------------------+--------------------+--------------------+-------------------------------------+---------+-----------+
|year|month|day|hour|  weekday|     DataRetrievedAt|                 _id|  commentPublishedAt|                          commentText|likeCount|    videoId|
+----+-----+---+----+---------+--------------------+--------------------+--------------------+-------------------------------------+---------+-----------+
|2023|    4|  7|   4|   Friday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-07T04:23:51Z|                           长大了许多|        0|WxTxeyqRM4E|
|2023|    4|  6|  23| Thursday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-06T23:17:43Z|                           So cute❤~~|        2|WxTxeyqRM4E|
|2023|    4|  5|   7|Wednesday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-05T07:42:41Z|                   最可愛的代言人😄🥰|        2|WxTxeyqRM4E|
|2023|    4|  4|  14|  Tuesday|2023-04-15T07:07:...|{643a546c332a3928...|2023-04-04T

In [37]:
merged_df_allcommentsinfo_unwinded = merged_df_allcommentsinfo_unwinded.withColumnRenamed(
    "commentText", "Comment")
merged_df_allcommentsinfo_unwinded = merged_df_allcommentsinfo_unwinded.withColumnRenamed(
    "likeCount", "LikeCount")
merged_df_allcommentsinfo_unwinded = merged_df_allcommentsinfo_unwinded.withColumnRenamed(
    "commentPublishedAt", "CommentPublishedTime")
merged_df_allcommentsinfo_unwinded = merged_df_allcommentsinfo_unwinded.drop(
    "_id")


In [38]:
merged_df_allcommentsinfo_unwinded.show()


+----+-----+---+----+---------+--------------------+--------------------+-------------------------------------+---------+-----------+
|year|month|day|hour|  weekday|     DataRetrievedAt|CommentPublishedTime|                              Comment|LikeCount|    videoId|
+----+-----+---+----+---------+--------------------+--------------------+-------------------------------------+---------+-----------+
|2023|    4|  7|   4|   Friday|2023-04-15T07:07:...|2023-04-07T04:23:51Z|                           长大了许多|        0|WxTxeyqRM4E|
|2023|    4|  6|  23| Thursday|2023-04-15T07:07:...|2023-04-06T23:17:43Z|                           So cute❤~~|        2|WxTxeyqRM4E|
|2023|    4|  5|   7|Wednesday|2023-04-15T07:07:...|2023-04-05T07:42:41Z|                   最可愛的代言人😄🥰|        2|WxTxeyqRM4E|
|2023|    4|  4|  14|  Tuesday|2023-04-15T07:07:...|2023-04-04T14:00:06Z|                  Coffee Sweet 好得意|        2|WxTxeyqRM4E|
|2023|    4|  4|  11|  Tuesday|2023-04-15T07:07:...|2023-04-04T11:33:17Z|      

##### Write dataframe of allvideocomments with sentiment scores


In [40]:
df_allcommentsinfo_unwinded_sentiment_coffee = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0415.allcommentsinfo_unwinded_sentiment').load()

df_allcommentsinfo_unwinded_sentiment_emi = spark.read.format('mongo').option(
    'spark.mongodb.input.uri', 'mongodb://172.1.0.10/youtubedataapi0415.allcommentsinfo_unwinded_sentiment2').load()

df_allcommentsinfo_unwinded_sentiment_coffee.createOrReplaceTempView(
    'df_llcommentsinfo_unwinded_sentiment_coffee')
df_allcommentsinfo_unwinded_sentiment_emi.createOrReplaceTempView(
    'df_llcommentsinfo_unwinded_sentiment_emi')

# merge the dataframes using union
merged_df_allcommentsinfo_sentiment = df_allcommentsinfo_unwinded_sentiment_emi.union(
    df_allcommentsinfo_unwinded_sentiment_coffee).drop("_id")

# show the merged dataframe
merged_df_allcommentsinfo_sentiment.show()


+---------------------+--------------------+--------+---------+-------------------+--------------------+
|              Comment|CommentPublishedTime|Language|LikeCount|          Magnitude|               Score|
+---------------------+--------------------+--------+---------+-------------------+--------------------+
| Do you have exerc...|2023-04-15T09:53:26Z|      en|        0|0.10000000149011612|-0.10000000149011612|
|         thanks girl!|2023-04-15T09:37:06Z|      en|        0| 0.8999999761581421|  0.8999999761581421|
|             Love it❤|2023-04-15T01:20:47Z|      en|        0| 0.8999999761581421|  0.8999999761581421|
|                   Me|2023-04-14T23:53:37Z|      en|        0|0.30000001192092896| 0.30000001192092896|
| Yesss pleaseee mo...|2023-04-14T19:45:52Z|      en|        0| 0.4000000059604645|  0.4000000059604645|
|               𝕎𝕆𝕎|2023-04-14T19:21:14Z|      en|        0|0.30000001192092896| 0.30000001192092896|
|                  Wow|2023-04-14T19:20:59Z|      en|     

##### Combine dataframes of allvideocommentsinfo and allvideocomments with sentiment scores


In [41]:
# combine the dataframes using join
combined_df_allcomments = merged_df_allcommentsinfo_unwinded.join(
    merged_df_allcommentsinfo_sentiment, ["Comment", "LikeCount", "CommentPublishedTime"], "outer")

# print the schema
combined_df_allcomments.printSchema()
# show the combined dataframe
combined_df_allcomments.show()


root
 |-- Comment: string (nullable = true)
 |-- LikeCount: integer (nullable = true)
 |-- CommentPublishedTime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- DataRetrievedAt: string (nullable = true)
 |-- videoId: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Score: double (nullable = true)





+--------------------+---------+--------------------+----+-----+---+----+---------+--------------------+-----------+--------+-------------------+--------------------+
|             Comment|LikeCount|CommentPublishedTime|year|month|day|hour|  weekday|     DataRetrievedAt|    videoId|Language|          Magnitude|               Score|
+--------------------+---------+--------------------+----+-----+---+----+---------+--------------------+-----------+--------+-------------------+--------------------+
|                    |        0|2017-11-05T17:54:55Z|2017|   11|  5|  17|   Sunday|2023-04-15T07:10:...|MdhbH0HNr2w|      en|                0.0|                 0.0|
|                    |        0|2021-04-23T15:58:26Z|2021|    4| 23|  15|   Friday|2023-04-15T11:17:...|s4fwetu2Px4|      en|                0.0|                 0.0|
|              加油⛽️|        1|2015-07-28T08:55:26Z|2015|    7| 28|   8|  Tuesday|2023-04-15T07:10:...|zT2jhR66ADo|      zh| 0.8999999761581421|  0.8999999761581421|


                                                                                

In [42]:
from pyspark.sql.functions import col

# select only the necessary columns from df_allvideodetails0415
df_allvideodetails0416_selected = df_allvideodetails0418.select(
    "videoId", "ChannelTitle")

# join the two dataframes on the videoId column
joined_comment_df = combined_df_allcomments.join(
    df_allvideodetails0416_selected, "videoId", "left_outer").drop(df_allvideodetails0416_selected.videoId)


In [43]:
joined_comment_df.show()


[Stage 84:>                                                         (0 + 1) / 1]

+-----------+----------------------------+---------+--------------------+----+-----+---+----+---------+--------------------+--------+-------------------+--------------------+------------+
|    videoId|                     Comment|LikeCount|CommentPublishedTime|year|month|day|hour|  weekday|     DataRetrievedAt|Language|          Magnitude|               Score|ChannelTitle|
+-----------+----------------------------+---------+--------------------+----+-----+---+----+---------+--------------------+--------+-------------------+--------------------+------------+
|vAPTEmKqywk|                           "|        0|2022-03-12T07:11:17Z|2022|    3| 12|   7| Saturday|2023-04-15T11:16:...|      en|0.20000000298023224| 0.20000000298023224|    emi wong|
|xtdozPQbSv8|        " oh it's good fo...|        2|2018-06-29T16:35:46Z|2018|    6| 29|  16|   Friday|2023-04-15T11:19:...|      en| 0.8999999761581421|  0.8999999761581421|    emi wong|
|FL_fAd_K720|        "Don't just print...|       31|2020-10-

                                                                                

Optional: write avro


In [None]:
combined_df_allcomments.write.format(
    "avro").save("combined_df_allcomments.avro")


#### Load data from spark to BigQuery


##### Write dataframe of allcomments_sentiment to BigQuery


In [None]:
from google.cloud import bigquery
# configure BigQuery client
client = bigquery.Client(project="spheric-temple-380502")

# create BigQuery table
table_ref = client.dataset("youtubedata").table(
    "allcomments_sentiment_0417")
schema = [
    bigquery.SchemaField("videoId", "STRING"),
    bigquery.SchemaField("Comment", "STRING"),
    bigquery.SchemaField("LikeCount", "INTEGER"),
    bigquery.SchemaField("CommentPublishedTime", "STRING"),
    bigquery.SchemaField("year", "INTEGER"),
    bigquery.SchemaField("month", "INTEGER"),
    bigquery.SchemaField("day", "INTEGER"),
    bigquery.SchemaField("hour", "INTEGER"),
    bigquery.SchemaField("weekday", "STRING"),
    bigquery.SchemaField("DataRetrievedAt", "STRING"),
    bigquery.SchemaField("Language", "STRING"),
    bigquery.SchemaField("Magnitude", "FLOAT"),
    bigquery.SchemaField("Score", "FLOAT"),
    bigquery.SchemaField("ChannelTitle", "STRING"),
]
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)

# write the dataframe to BigQuery
joined_comment_df.write \
    .format("bigquery") \
    .option("writeMethod", "direct") \
    .mode("append") \
    .save('spheric-temple-380502.youtubedata.allcomments_sentiment_0417')


##### Write dataframe of allvideodetails to BigQuery


In [62]:
from google.cloud import bigquery
# configure BigQuery client
client = bigquery.Client(project="spheric-temple-380502")

table_id = "spheric-temple-380502.youtubedata.allvideodetails0416"

schema = [
    bigquery.SchemaField("Video_Title", "STRING"),
    bigquery.SchemaField("year", "INTEGER"),
    bigquery.SchemaField("month", "INTEGER"),
    bigquery.SchemaField("day", "INTEGER"),
    bigquery.SchemaField("hour", "INTEGER"),
    bigquery.SchemaField("weekday", "STRING"),
    bigquery.SchemaField("Caption", "STRING"),
    bigquery.SchemaField("ChannelTitle", "STRING"),
    bigquery.SchemaField("Comments", "INTEGER"),
    bigquery.SchemaField("DataRetrievedAt", "STRING"),
    bigquery.SchemaField("Definition", "STRING"),
    bigquery.SchemaField("Description", "STRING"),
    bigquery.SchemaField("Duration", "FLOAT"),
    bigquery.SchemaField("Likes", "INTEGER"),
    bigquery.SchemaField("Published_date", "STRING"),
    bigquery.SchemaField("Tags", "STRING", mode="REPEATED"),
    bigquery.SchemaField("VideoId", "STRING"),
    bigquery.SchemaField("Views", "INTEGER")
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)

print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")


df_allvideodetails0418.write \
    .format("bigquery") \
    .option("writeMethod", "direct") \
    .mode("append") \
    .save('spheric-temple-380502.youtubedata.allvideodetails0416')


Created table spheric-temple-380502.youtubedata.allvideodetails0416


                                                                                

In [23]:
# v2

from google.cloud import bigquery
# configure BigQuery client
client = bigquery.Client(project="spheric-temple-380502")

table_id = "spheric-temple-380502.youtubedata.allvideodetails0418"

schema = [
    {"name": "year", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "month", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "day", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "hour", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "weekday", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Caption", "type": "STRING", "mode": "NULLABLE"},
    {"name": "ChannelTitle", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Comments", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "DataRetrievedAt", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Definition", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Description", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Duration", "type": "FLOAT", "mode": "NULLABLE"},
    {"name": "Likes", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "Published_date", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Tags", "type": "STRING", "mode": "REPEATED"},
    {"name": "VideoId", "type": "STRING", "mode": "NULLABLE"},
    {"name": "VideoTitle", "type": "STRING", "mode": "NULLABLE"},
    {"name": "Views", "type": "INTEGER", "mode": "NULLABLE"},
    {"name": "TagNum", "type": "INTEGER", "mode": "REQUIRED"},
    {"name": "likeRatio", "type": "FLOAT", "mode": "NULLABLE"},
    {"name": "commentRatio", "type": "FLOAT", "mode": "NULLABLE"},
]


table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)

print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")


df_allvideodetails0418.write \
    .format("bigquery") \
    .option("writeMethod", "direct") \
    .mode("append") \
    .save('spheric-temple-380502.youtubedata.allvideodetails0418')


Created table spheric-temple-380502.youtubedata.allvideodetails0418


                                                                                