In [0]:
pip install --upgrade google-api-python-client

In [0]:
pip install isodate

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,udf,to_date,to_timestamp,try_to_timestamp,expr,regexp_replace,lower,current_date
from pyspark.sql.types import StructField,StructType,StringType,IntegerType,LongType

from datetime import date,timedelta

spark=SparkSession.builder.appName('youtube').getOrCreate()

In [0]:
from googleapiclient.discovery import build
import time
import isodate

In [0]:
dbutils.widgets.text('channel_id','UCx8Z14PpntdaxCt2hakbQLQ','channel Id')
dbutils.widgets.text("is_history", 'true', "Is History")

In [0]:
start_date=date.today()
end_date=start_date - timedelta(days=1)

published_after = f"{start_date}T00:00:00Z"
published_before = f"{end_date}T00:00:00Z"


print(f"Start: {published_after}")
print(f"End: {published_before}")

In [0]:

channel_ids=dbutils.widgets.get('channel_id')
is_history=dbutils.widgets.get("is_history")

In [0]:
API_KEY='AIzaSyDnfmBIEuOPppoHLEdTifcFKXFkRr18B6s'
#API_KEY='AIzaSyCtzZWDGZjFe4THH5-13qPCUcbrUmNtSeY'
#API_KEY='AIzaSyDUP6JrEl3XMRHcJ9ABbrEnfiIwVHkCUQA'
# API_KEY='AIzaSyDShWx3h7lPqHGdL6D31W-GmkjfiD4XvTE'
# API_KEY='AIzaSyDeRhnlvT2Vb-ashLJ05a-9_V_GGD5nXwc'
# API_KEY='AIzaSyAG99f2ZRNx_iUIIybD2lTYw4PEh9ya-6g'
#channel_id='UCx8Z14PpntdaxCt2hakbQLQ' ## lallantop

api_service_name='youtube'
api_version_name='v3'

youtube=build(api_service_name,api_version_name,developerKey=API_KEY)

In [0]:
%run ./table_creation

## Get Channel Data from youtube

In [0]:
def get_channel_data(youtube,channel_id):
    request=youtube.channels().list(
        part='snippet,contentDetails,statistics,status',
        id=channel_id
    )
    channel_data=[]
    playlist_id=[]
    response=request.execute()
    for i in response['items']:
        playlist_id.append(i['contentDetails']['relatedPlaylists']['uploads'])
        details={
            'channel_id':i['id'],
            'title':i['snippet']['title'],
            'description':i['snippet']['description'],
            'createdAt':i['snippet']['publishedAt'],
            'subscribers_count':i['statistics']['subscriberCount'],
            'views_count':i['statistics']['viewCount'],
            'videos_count':i['statistics']['videoCount'],
            'privacyStatus':i['status']['privacyStatus']
        }
        channel_data.append(details)
    return channel_data,playlist_id
channel_data,playlist_id=get_channel_data(youtube,channel_ids)
channel_df=spark.createDataFrame(channel_data)

channel_df=channel_df.withColumn('createdAt',regexp_replace(col("createdAt"), r'\.\d{1,6}(?=Z)', ''))
channel_df=channel_df.withColumn('createdAt',to_timestamp(col("createdAt"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

channel_df=channel_df.withColumn('subscribers_count',col('subscribers_count').cast(LongType()))
channel_df=channel_df.withColumn('views_count',col('views_count').cast(LongType()))
channel_df=channel_df.withColumn('videos_count',col('videos_count').cast(LongType()))

channel_df.createOrReplaceTempView('temp_channel')




In [0]:
%sql
merge into youtube.channel t 
using temp_channel s 
on t.channel_id=s.channel_id
when matched
then
update set t.subscribers_count=s.subscribers_count,t.videos_count=s.videos_count,t.views_count=s.views_count
when not matched
then insert(channel_id,description,privacyStatus,createdAt,subscribers_count,title,videos_count,views_count)
values (s.channel_id,s.description,s.privacyStatus,s.createdAt,s.subscribers_count,s.title,s.videos_count,s.views_count)

## Get the video_id for a channel

In [0]:

def get_all_video_ids_playlist(playlist_id):
    video_id=[]
    next_page_token=None
    while True:
        request=youtube.playlistItems().list(
            part='contentDetails',
            playlistId=playlist_id,
            maxResults=50,
            pageToken=next_page_token
        )
        response=request.execute()
        for i in response['items']:
            video_id.append(i['contentDetails']['videoId'])
        
        next_page_token=response.get('nextPageToken')
        if next_page_token is None:
            break
        time.sleep(1)
    return video_id



In [0]:
def get_all_video_ids(channel_id):
    # published_after = "2025-06-10T00:00:00Z"
    # published_before="2025-06-29T00:00:00Z"
    video_id=[]
    next_page_token=None
    while True:
        request=youtube.search().list(
            part='snippet',
            maxResults=50,
            publishedAfter=published_after,
            publishedBefore=published_before,
            type="video",
            channelId=channel_id,
            pageToken=next_page_token )
        response=request.execute()
        for i in response['items']:
            video_id.append(i['id']['videoId'])
    
        next_page_token=response.get('nextPageToken')
        if next_page_token is None:
            break
        time.sleep(1)
    return video_id



In [0]:
if is_history=='true':
    all_videos_ids=[]
    for i in playlist_id:
        video_ids=get_all_video_ids_playlist(i)
        all_videos_ids=all_videos_ids+(video_ids)
    print(len(all_videos_ids))
else:
    all_videos_ids=[]
    channels=spark.sql('select distinct channel_id from youtube.channel')
    for row in channels.toLocalIterator():
            channel_id=row['channel_id']
            video_ids=get_all_video_ids(channel_id)
            print(channel_id,len(video_ids))
            all_videos_ids=all_videos_ids+(video_ids)
    print(len(all_videos_ids))

## toLocalIterator method
# Keeps memory usage low.
# ✅ Avoids driver overload (unlike .collect()).
# ✅ Easy to debug.
# ⚠️ Still runs API calls on the driver — good for rate-limited services like YouTube.

In [0]:
def get_videos_details(youtube,video_ids):
    all_details=[]
    for i in range(0,len(video_ids),50):
        request=youtube.videos().list(
            part='snippet,contentDetails,status,statistics',
            id=','.join(video_ids[i:i+50])
        )
        response=request.execute()
        for i in response['items']:
            detail = {
                'channel_id':i['snippet']['channelId'],
                'video_id': i['id'],
                'publishedAt': i['snippet']['publishedAt'],
                'title': i['snippet']['title'],
                'description': i['snippet']['description'],
                'duration': i['contentDetails']['duration'],
                'uploadStatus': i['status'].get('uploadStatus'),
                'viewCount': i['statistics'].get('viewCount'),
                'likeCount': i['statistics'].get('likeCount'),
                'dislikeCount': i['statistics'].get('dislikeCount'),  # May be None or missing
                'commentCount': i['statistics'].get('commentCount')
            }
            all_details.append(detail)
        time.sleep(1)
    return all_details

all_videos_details=[]
all_videos_details=get_videos_details(youtube,all_videos_ids)

In [0]:
@udf(IntegerType())
def iso_sec(duration):
    try:
        total_sec=isodate.parse_duration(duration).total_seconds()
        return int(total_sec)
    except:
        return None


In [0]:

schema=StructType([
                   StructField('channel_id',StringType(),True),
                   StructField('video_id',StringType(),True),
                   StructField('publishedAt',StringType(),True),
                   StructField('title',StringType(),True),
                   StructField('description',StringType(),True),
                   StructField('duration',StringType(),True),
                   StructField('uploadStatus',StringType(),True),
                   StructField('viewCount',StringType(),True),
                   StructField('likeCount',StringType(),True),
                   StructField('dislikeCount',StringType(),True),
                   StructField('commentCount',StringType(),True)
                   ])
videos_df=spark.createDataFrame(all_videos_details,schema)

videos_df=videos_df.withColumn('publishedAt',regexp_replace(col("publishedAt"), r'\.\d{1,6}(?=Z)', ''))
videos_df=videos_df.withColumn('publishedAt',to_timestamp(col("publishedAt"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
videos_df=videos_df.withColumn('duration',iso_sec(videos_df['duration']))
videos_df = videos_df.withColumn('viewCount', videos_df['viewCount'].cast(LongType()))
videos_df = videos_df.withColumn('likeCount', videos_df['likeCount'].cast(LongType()))
videos_df = videos_df.withColumn('dislikeCount', videos_df['dislikeCount'].cast(LongType()))
videos_df = videos_df.withColumn('commentCount', videos_df['commentCount'].cast(LongType()))



In [0]:
videos_df_title=videos_df.alias('v').join(channel_df.alias('c'),on='channel_id',how='inner').select('v.*',col('c.title').alias('channel_title'))

## Shorts Videos

In [0]:


shorts_video = videos_df_title.filter(
    col('uploadStatus') == 'processed'
).filter(
    (col('duration') < 120) & lower(col('title')).contains('#shorts')
)

shorts_video.createOrReplaceTempView('shorts_video')

In [0]:
%sql
merge into youtube.shorts_video t 
using shorts_video s 
on t.video_id=s.video_id
when not matched
then insert(channel_id,channel_title,video_id,publishedAt,title,description,duration,uploadStatus,viewCount,likeCount,dislikeCount,commentCount) values(s.channel_id,s.channel_title,s.video_id,s.publishedAt,s.title,s.description,s.duration,s.uploadStatus,s.viewCount,s.likeCount,s.dislikeCount,s.commentCount)

## videos

In [0]:
videos_df_title = videos_df_title.filter(
    col('uploadStatus') == 'processed'
).filter(
    (col('duration') > 120) & (~lower(col('title')).contains('#shorts'))
)

videos_df_title.createOrReplaceTempView('videos_temp')

In [0]:
%sql
merge into youtube.videos_silver t 
using videos_temp s 
on t.video_id=s.video_id
when matched then
update set t.viewCount=s.viewCount,t.likeCount=s.likeCount,t.commentCount=s.commentCount
when not matched 
then insert(channel_id,channel_title,video_id,publishedAt,title,description,duration,uploadStatus,viewCount,likeCount,dislikeCount,commentCount) values(s.channel_id,s.channel_title,s.video_id,s.publishedAt,s.title,s.description,s.duration,s.uploadStatus,s.viewCount,s.likeCount,s.dislikeCount,s.commentCount)