In [1]:
import os
os.environ['SPARK_HOME'] = '/work/rc/g.pillai/spark-3.3.1-bin-hadoop3'

In [2]:
import json
from datetime import datetime
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType

In [3]:
spark = SparkSession.builder.appName("YoutubeData").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/18 22:00:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [31]:
# -*- coding: utf-8 -*-

# Sample Python code for youtube.search.list
# See instructions for running these code samples locally:
# https://developers.google.com/explorer-help/code-samples#python

import os

import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors

scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]

def search_data():
    # Disable OAuthlib's HTTPS verification when running locally.
    # *DO NOT* leave this option enabled in production.
    # os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

    api_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'  #Use your API key here
    api_service_name = "youtube"
    api_version = "v3"

    # Get credentials and create an API client
    youtube = googleapiclient.discovery.build(
        api_service_name, api_version, developerKey=api_key)

    request = youtube.search().list(
        part="snippet",
        channelType="any",
        maxResults=50,
        q="Pyspark using Airflow in GCP",
        order="relevance",
        videoDefinition="any"
    )
    response = request.execute()

    return response

search_data = search_data()

In [32]:
def video_data(video_id):
    api_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'  #Use your API key here
    api_service_name = "youtube"
    api_version = "v3"

    youtube = googleapiclient.discovery.build(
        api_service_name, api_version, developerKey=api_key)
    

    request = youtube.videos().list(
        part = "snippet",
        id = video_id
    )
    
    response = request.execute()
    
    return response

In [33]:
def get_description(search_data):
    desc_data = []
    for item in search_data['items']:
        try:
            video_id = item['id']['videoId']
            json_data = video_data(str(video_id))
            description = json_data['items'][0]['snippet']['description']
            tags = json_data['items'][0]['snippet']['tags']
            desc_data.append({'videoId': video_id,
                              'description': description,
                              'tags' : tags
                            })
        except KeyError:
            pass

    return desc_data

In [34]:
video_data_from_search = []
for item in search_data['items']:
    try:
        video_id = item['id']['videoId']
    except KeyError:
        video_id = None

    try:
        title = item['snippet']['title']
    except KeyError:
        title = None

    try:
        channel_title = item['snippet']['channelTitle']
    except KeyError:
        channel_title = None

    try:
        published_at = item['snippet']['publishedAt']
    except KeyError:
        published_at = None

    video_data_from_search.append({
        'videoId': video_id,
        'title': title,
        'channelTitle': channel_title,
        'publishedAt': published_at
        # Add more fields as needed
    })

In [35]:
video_descriptions = get_description(search_data)

In [36]:
from pyspark.sql.functions import current_date
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp

search_schema = StructType([
    StructField('videoId', StringType(), True),
    StructField('title', StringType(), True),
    StructField('channelTitle', StringType(), True),
    StructField('publishedAt', StringType(), True)
])

desc_schema = StructType([
    StructField('videoId', StringType(), True),
    StructField('description', StringType(), True),
    StructField('tags', ArrayType(StringType()), True)
])

video_df = spark.createDataFrame(video_data_from_search, schema=search_schema)
desc_df = spark.createDataFrame(video_descriptions, schema=desc_schema)

spark_df = video_df.join(desc_df, 'videoId', 'inner')

# Convert 'publishedAt' column to timestamp
spark_df = spark_df.withColumn('publishedAt', to_timestamp('publishedAt', 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\''))

# Adding 'date_created' column with today's date
spark_df = spark_df.withColumn('date_created', current_date())

# Show the DataFrame
spark_df.show()

+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+------------+
|    videoId|               title|        channelTitle|        publishedAt|         description|                tags|date_created|
+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+------------+
|2v9AKewyUEo|Learning Apache A...|         Soumil Shah|2021-01-24 15:05:38|Code :https://git...|[airflow, apche, ...|  2023-12-18|
|5peQThvQmQk|Learn Apache Airf...|      Darshil Parmar|2023-10-07 13:00:04|Join My Data Engi...|[darshil parmar, ...|  2023-12-18|
|7_HfVnjZ924|Installing Airflo...|       futureXskills|2023-01-14 19:47:29|Airflow Playlist ...|[airflow, apache ...|  2023-12-18|
|DzxtCxi4YaA|Data Engineer Pro...|      Data with Marc|2023-08-08 14:11:37|Data Engineer Pro...|[data engineer, d...|  2023-12-18|
|GqAcTrqKcrY|Realtime Data Str...|          CodeWithYu|2023-09-06 09:28:13|In this 

In [37]:
spark_df.count()

32

In [38]:
# Save the joined DataFrame as a CSV file
output_path = "/work/rc/g.pillai/learnSpark/twitter-airflow/youtube_sample_data"  # Replace with your desired output path

# Write the DataFrame to parquet table
spark_df.write \
        .partitionBy("date_created") \
        .parquet(output_path, mode="append")

In [39]:
# Define the base path of the partitioned table
base_path = "/work/rc/g.pillai/learnSpark/twitter-airflow/youtube_sample_data"

# Read the entire partitioned table
df = spark.read.parquet(base_path)

# Show the data in the DataFrame
df.show()

+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+------------+
|    videoId|               title|        channelTitle|        publishedAt|         description|                tags|date_created|
+-----------+--------------------+--------------------+-------------------+--------------------+--------------------+------------+
|2v9AKewyUEo|Learning Apache A...|         Soumil Shah|2021-01-24 15:05:38|Code :https://git...|[airflow, apche, ...|  2023-12-18|
|5peQThvQmQk|Learn Apache Airf...|      Darshil Parmar|2023-10-07 13:00:04|Join My Data Engi...|[darshil parmar, ...|  2023-12-18|
|7_HfVnjZ924|Installing Airflo...|       futureXskills|2023-01-14 19:47:29|Airflow Playlist ...|[airflow, apache ...|  2023-12-18|
|DzxtCxi4YaA|Data Engineer Pro...|      Data with Marc|2023-08-08 14:11:37|Data Engineer Pro...|[data engineer, d...|  2023-12-18|
|GqAcTrqKcrY|Realtime Data Str...|          CodeWithYu|2023-09-06 09:28:13|In this 

In [40]:
df.count()

98