In [1]:
import os
import shutil
from git import Repo
import pandas as pd
import numpy as np
from datetime import datetime
from pytz import timezone
import json
import seaborn as sns
import matplotlib.pyplot as plt
from kaggle_secrets import UserSecretsClient
import google.generativeai as genai
from youtube_transcript_api import YouTubeTranscriptApi
import textwrap

In [2]:
def GenAI(FeatureEngineering_File, api_key):
    genai.configure(api_key=api_key)
    model = genai.GenerativeModel('gemini-pro')

    # Initialize columns
    FeatureEngineering_File['videoTranscript'] = None
    FeatureEngineering_File['videoTranscriptLog'] = ""
    for index, row in FeatureEngineering_File.iterrows():
        # print(f"index: {index}")
        video_id = row['videoId']
        try:
            # First try to list all available transcripts
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            # print(transcript_list)
            # Find all English variants (any language starting with 'en')
            # for t in transcript_list:
            #     print(t.language_code)
                
            english_transcripts = [t.language_code for t in transcript_list if t.language_code.startswith('en')]
            FeatureEngineering_File.at[index, 'videoTranscriptLog'] +=(f"english:{english_transcripts}. ")
            if english_transcripts:
                # Try to get the most standard English first
                try:
                    transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
                    transcript_text = " ".join([t['text'] for t in transcript])
                    if transcript_text.strip() =="":
                        FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Standard English: No transcripts available. '
                        for i in english_transcripts:
                            transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[i])
                            transcript_text = " ".join([t['text'] for t in transcript])
                            if transcript_text.strip() =="":
                                FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Any available English variant: No transcripts available. '
                                list_transcript = YouTubeTranscriptApi.list_transcripts(video_id)
                                available_langs = [t.language_code for t in list_transcript]
                                FeatureEngineering_File.at[index, 'videoTranscriptLog'] += (f"non-english:{available_langs}. ")
                                for i in available_langs:
                                    transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[i])
                                    transcript_text = " ".join([t['text'] for t in transcript])
                                    if transcript_text.strip() =="":
                                        FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Any available transcript other than english: No transcripts available. '
                                    else:
                                        FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                                        # print(transcript_text)
                                        break
                            else:
                                FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                                # print(transcript_text)
                                break
                    else:
                        FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                        # print(transcript_text)
  
                        
                except:
                    # Fall back to any available English variant
                     for i in english_transcripts:
                            transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[i])
                            transcript_text = " ".join([t['text'] for t in transcript])
                            if transcript_text.strip() =="":
                                FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Any available English variant: No transcripts available. '
                                list_transcript = YouTubeTranscriptApi.list_transcripts(video_id)
                                available_langs = [t.language_code for t in list_transcript]
                                FeatureEngineering_File.at[index, 'videoTranscriptLog'] += (f"non-english:{available_langs}. ")
                                for i in available_langs:
                                    transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[i])
                                    transcript_text = " ".join([t['text'] for t in transcript])
                                    if transcript_text.strip() =="":
                                        FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Any available transcript other than english: No transcripts available. '
                                    else:
                                        FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                                        # print(transcript_text)
                                        break
                            else:
                                FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                                # print(transcript_text)
                                break
            else:
                # No English available - try to get any available transcript
                try:
                    # Get the first available transcript
                    list_transcript = YouTubeTranscriptApi.list_transcripts(video_id)
                    available_langs = [t.language_code for t in list_transcript]
                    for i in available_langs:
                        transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[i])
                        transcript_text = " ".join([t['text'] for t in transcript])
                        if transcript_text.strip() =="":
                            FeatureEngineering_File.at[index, 'videoTranscriptLog'] += 'Any available transcript other than english: No transcripts available. '
                        else:
                            FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
                            # print(transcript_text)
                            break
                   
                except Exception as fallback_error:
                    FeatureEngineering_File.at[index, 'videoTranscriptLog'] += (f"No transcripts available. Error: {str(fallback_error)} ")
        except Exception as e:
            FeatureEngineering_File.at[index, 'videoTranscriptLog'] += (f"Error getting transcript for video ID {video_id}: {str(e)} ")
        if FeatureEngineering_File.at[index, 'videoTranscriptLog'] == "":
            FeatureEngineering_File.at[index, 'videoTranscriptLog'] = None
    return FeatureEngineering_File
    

In [3]:
# def GenAI(FeatureEngineering_File, api_key):
#     genai.configure(api_key=api_key)
#     model = genai.GenerativeModel('gemini-pro')

#     # Initialize columns
#     FeatureEngineering_File['videoTranscript'] = None
#     FeatureEngineering_File['videoTranscriptNonEnglish'] = None
#     FeatureEngineering_File['videoTranscriptLog'] = None
    
#     for index, row in FeatureEngineering_File.iterrows():
#         video_id = row['videoId']
#         try:
#             # First try to list all available transcripts
#             transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            
#             # Find all English variants (any language starting with 'en')
#             english_transcripts = [t for t in transcript_list if t.language_code.startswith('en')]
            
#             if english_transcripts:
#                 # Try to get the most standard English first
#                 try:
#                     transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
#                     transcript_text = " ".join([t['text'] for t in transcript])
#                     FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
#                     FeatureEngineering_File.at[index, 'videoTranscriptLog'] = "Used standard English (en)"
#                 except:
#                     # Fall back to any available English variant
#                     english_transcripts[0].fetch()
#                     transcript = english_transcripts[0].fetch()
#                     transcript_text = " ".join([t['text'] for t in transcript])
#                     FeatureEngineering_File.at[index, 'videoTranscript'] = transcript_text
#                     FeatureEngineering_File.at[index, 'videoTranscriptLog'] = f"Used {english_transcripts[0].language_code}"
#             else:
#                 # No English available - try to get any available transcript
#                 try:
#                     # Get the first available transcript
#                     transcript = transcript_list[0].fetch()
#                     transcript_text = " ".join([t['text'] for t in transcript])
#                     FeatureEngineering_File.at[index, 'videoTranscriptNonEnglish'] = transcript_text
                    
#                     # Log available languages
#                     available_langs = [t.language_code for t in transcript_list]
#                     FeatureEngineering_File.at[index, 'videoTranscriptLog'] = (
#                         f"No English available. Used {transcript_list[0].language_code}. "
#                         f"Available languages: {available_langs}"
#                     )
#                 except Exception as fallback_error:
#                     FeatureEngineering_File.at[index, 'videoTranscriptLog'] = (
#                         f"No transcripts available. Error: {str(fallback_error)}"
#                     )
                
#         except Exception as e:
#             print(f"Error getting transcript for video ID {video_id}: {e}")
#             FeatureEngineering_File.at[index, 'videoTranscriptLog'] = str(e)
    
#     return FeatureEngineering_File

In [4]:
def FeatureEngineering_File_Extraction(repo_url, kaggle_repo_url, FeatureEngineering_path):
    if os.path.exists(kaggle_repo_url):
        print("Repository already exists locally.")
        repo = Repo(kaggle_repo_url)  
        repo.config_writer().set_value("user", "name", name).release()
        repo.config_writer().set_value("user", "email", email).release()
        origin = repo.remote(name='origin')  
        origin.pull() 
        print("Successfully pulled the latest changes.")
    else:
        repo = Repo.clone_from(repo_url, kaggle_repo_url)
        repo.config_writer().set_value("user", "name", name).release()
        repo.config_writer().set_value("user", "email", email).release()
        print("Successfully cloned the repository.")

   
    output_files = os.listdir(FeatureEngineering_path)
    FeatureEngineering_File = max(
        [file for file in output_files if file.startswith("FE_") and file.endswith('records.json')]
    )

   
    FeatureEngineering_File = pd.read_json(os.path.join(FeatureEngineering_path, FeatureEngineering_File))

    return FeatureEngineering_File

In [5]:
def RawFile(dataframe):
    try:
        # Check if the DataFrame is not empty before saving.
        if not dataframe.empty:
            # Count the number of records (rows) in the DataFrame
            record_count = len(dataframe)
            
            # Generate a timestamp for the file name using the current time in IST (Indian Standard Time).
            timestamp = datetime.now(ist).strftime("%Y-%m-%d_%H_%M_%S")
        
            # Create a filename using the generated timestamp to ensure uniqueness with number of records.
            filename = f"YTSGENAI_{timestamp}_{record_count}_records.json"
            
            # Save the DataFrame to a JSON file with readable formatting.
            dataframe.to_json(filename, orient="records", indent=4)
            print(f"DataFrame saved as {filename}")
        else:
            # Log a message if the DataFrame is empty.
            print("No data to save since empty DataFrame returned.")
        
        # Return True indicating the process was successful.
        return True
    except Exception as e:
        # Handle and log any errors that occur during the process.
        print(f"Error during raw file creation: {e}")
        
        # Return False indicating the process failed.
        return False

In [6]:
def PushToGithub(destination_path):
    
    try:
        output_files = os.listdir('/kaggle/working')
        filename = [file for file in output_files if file.startswith("YTSGENAI_") and file.endswith("_records.json")]
        if filename:
            filename = max(filename, key=os.path.getctime)  # Get the latest file based on creation time
        else:
            raise ValueError("No JSON files found!")
            
        if os.path.exists(kaggle_repo_url):
            print("Already cloned and the repo file exists")
            repo = Repo(kaggle_repo_url)
            repo.config_writer().set_value("user", "name", name).release()
            repo.config_writer().set_value("user", "email", email).release()
            origin = repo.remote(name='origin')
            origin.pull()
            print("Successfully pulled the git repo before push")
        else:
            repo = Repo.clone_from(repo_url, kaggle_repo_url)
            repo.config_writer().set_value("user", "name", name).release()
            repo.config_writer().set_value("user", "email", email).release()
            print("Successfully cloned the git repo")
        
        if os.path.exists(destination_path):
            shutil.copyfile(f'/kaggle/working/{filename}', f'{destination_path}/{filename}')
        else:
            os.makedirs(destination_path)
            shutil.copyfile(f'/kaggle/working/{filename}', f'{destination_path}/{filename}')
        
        repo = Repo(kaggle_repo_url)
        repo.index.add([f"{destination_path}/{filename}"])
        timestamp = datetime.now(ist).strftime("%Y-%m-%d_%H:%M:%S")
        repo.index.commit(f"{timestamp} Added files from Kaggle notebook, {filename}")
        origin = repo.remote(name="origin")
        # push_result = origin.push()
        push_result = origin.push(refspec=f"HEAD:refs/heads/functionality-or-workflow-development")
        
        if push_result:
            print("Output files successfully pushed to GitHub!")
        else:
            print("Output files pushed to GitHub failed:(")
        return True
    
    except Exception as e:
        print(f"An error occurred at git automation code: {e}")
        return False

In [7]:
def main():
    FeatureEngineering_File = FeatureEngineering_File_Extraction(repo_url, kaggle_repo_url, FeatureEngineering_path)
    dataframe = pd.DataFrame(FeatureEngineering_File)
    dataframe = GenAI(dataframe,api_key)
    RawFile(dataframe)
    PushToGithub(VideoSummarization_path)
    # print(dataframe_transcript['videoTranscript'])
    # print(dataframe_transcript['videoTranscriptLog'].isna().sum())
    return True

In [8]:
if __name__ == "__main__":    
    user_secrets = UserSecretsClient()
    secret_value_0 = user_secrets.get_secret("AuthorName")
    secret_value_1 = user_secrets.get_secret("gitEmail")
    secret_value_2 = user_secrets.get_secret("GOOGLE_API_KEY")
    secret_value_3 = user_secrets.get_secret("repo_url_youtube_analysis")

    name = secret_value_0
    email = secret_value_1
    api_key = secret_value_2
    repo_url = secret_value_3
    
    kaggle_repo_url = '/kaggle/working/DevOps-YouTube-Trends'
    FeatureEngineering_path = '/kaggle/working/DevOps-YouTube-Trends/FeatureEngineering/Daily'
    VideoSummarization_path = '/kaggle/working/DevOps-YouTube-Trends/VideoSummarization/Daily'

    ist = timezone("Asia/Kolkata")
    
    main()

Successfully cloned the repository.
DataFrame saved as YTSGENAI_2025-04-20_00_06_23_390_records.json
Already cloned and the repo file exists
Successfully pulled the git repo before push
Output files successfully pushed to GitHub!


In [9]:
['XvtFppcynYM','mg1ZqahIpVw','xH5EY7FCFQw','an8SrFtJBdM','Qm79wDSCZ-w','vPd7H8EMmD0']

['XvtFppcynYM',
 'mg1ZqahIpVw',
 'xH5EY7FCFQw',
 'an8SrFtJBdM',
 'Qm79wDSCZ-w',
 'vPd7H8EMmD0']