In [28]:
import pandas as pd
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
import json
from datetime import datetime
import os

class YoutubeChannelSubtitles:
    def __init__(self, api_key):
        self.youtube = build('youtube', 'v3', developerKey=api_key)
        
    def get_channel_id(self, channel_name):
        """Gets the channel ID from the channel name"""
        request = self.youtube.search().list(
            q=channel_name,
            type='channel',
            part='id',
            maxResults=1
        )
        response = request.execute()
        
        if response['items']:
            return response['items'][0]['id']['channelId']
        return None

    def get_recent_videos(self, channel_id, max_results=50, days_back=7):
        """Gets videos from the channel published in the last X days"""
        # Request more videos than needed to ensure we have enough after filtering
        request = self.youtube.search().list(
            channelId=channel_id,
            order='date',  # Sort by date
            part='snippet',
            maxResults=max_results,  # Request more to filter afterward
            type='video'
        )
        response = request.execute()
        
        videos = []
        # Calculate cutoff date (7 days ago from now)
        from datetime import datetime, timedelta
        cutoff_date = datetime.now() - timedelta(days=days_back)
        
        for item in response['items']:
            # Convert published date string to datetime object
            published_at = datetime.strptime(
                item['snippet']['publishedAt'], 
                '%Y-%m-%dT%H:%M:%SZ'
            )
            
            # Only include videos published after the cutoff date
            if published_at >= cutoff_date:
                video = {
                    'title': item['snippet']['title'],
                    'video_id': item['id']['videoId'],
                    'published_at': item['snippet']['publishedAt']
                }
                videos.append(video)
        
        return videos

    def download_subtitles(self, video_id, languages=['es', 'en'], output_dir='subtitles'):
        """Downloads subtitles for a video in the specified languages"""
        try:
            # Create directory if it doesn't exist
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)

            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
            results = {}
            
            for language in languages:
                try:
                    transcript = transcript_list.find_transcript([language])
                    subtitles = transcript.fetch()
                    
                    # Save in JSON format
                    filename = f'{output_dir}/{video_id}_{language}.json'
                    with open(filename, 'w', encoding='utf-8') as f:
                        json.dump(subtitles, f, ensure_ascii=False, indent=2)
                    
                    results[language] = 'Success'
                    
                except Exception as e:
                    results[language] = f'Failed: {str(e)}'
            
            return results
            
        except Exception as e:
            return f'Failed to get subtitles: {str(e)}'

    def process_channel(self, channel_name, max_videos=50, languages=['es', 'en'], days_back=7):
        """Processes videos from a channel within the specified time frame"""
        # Get channel ID
        channel_id = self.get_channel_id(channel_name)
        if not channel_id:
            return f"Channel not found: {channel_name}"

        # Get recent videos from the last X days
        videos = self.get_recent_videos(channel_id, max_videos, days_back)
        
        if not videos:
            return f"No videos found in the last {days_back} days for channel: {channel_name}"
        
        results = []
        for video in videos:
            result = {
                'title': video['title'],
                'video_id': video['video_id'],
                'published_at': video['published_at'],
                'subtitles': self.download_subtitles(video['video_id'], languages)
            }
            results.append(result)
            
        # Save results to a log file
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        with open(f'results_{timestamp}.json', 'w', encoding='utf-8') as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
            
        return results

    def process_channel_to_csv(self, channel_name, max_videos=5, days_back=7):
        """Processes videos from the last X days and creates a CSV with subtitles"""
        # Get channel ID
        channel_id = self.get_channel_id(channel_name)
        if not channel_id:
            return f"Channel not found: {channel_name}"

        # Get videos from the last X days
        videos = self.get_recent_videos(channel_id, max_videos, days_back)
        
        if not videos:
            return f"No videos found in the last {days_back} days for channel: {channel_name}"
        
        # List to store data
        data = []
        
        for video in videos:
            video_id = video['video_id']
            title = video['title']
            # Convert date to a more readable format
            date = datetime.strptime(video['published_at'], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d')
            video_url = f"https://www.youtube.com/watch?v={video_id}"
            
            try:
                # Get subtitles
                transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
                
                # Try manual subtitles first
                try:
                    transcript = transcript_list.find_manually_created_transcript()
                except:
                    # If no manual subtitles, try with any available language
                    transcript = transcript_list.find_transcript(['en', 'es'])
                
                subtitles = transcript.fetch()
                language = transcript.language
                
                # Get text from each subtitle entry
                subtitle_texts = [entry['text'] for entry in subtitles]
                
                # Clean special characters from each text segment
                cleaned_texts = []
                for text in subtitle_texts:
                    # Replace newlines with spaces
                    cleaned_text = text.replace('\n', ' ')
                    # Replace escaped backslashes
                    cleaned_text = cleaned_text.replace('\\', '')
                    # Replace multiple spaces with a single space
                    import re
                    cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
                    # Remove any other special characters if needed
                    # cleaned_text = re.sub(r'[^\w\s.,!?;:\-\'"]', '', cleaned_text)
                    cleaned_texts.append(cleaned_text)
                
                # Join all cleaned text segments with spaces
                full_text = ' '.join(cleaned_texts).strip()
                
                # Add to data list
                data.append({
                    'Title': title,
                    'Date': date,
                    'Text': full_text,
                    'Link': video_url,
                    'Language': language
                })
                
            except Exception as e:
                print(f"Error processing video {video_id}: {str(e)}")
                
        # Create DataFrame and save as CSV
        df = pd.DataFrame(data)
        df = df[['Title', 'Date', 'Text', 'Link', 'Language']]
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        csv_filename = f'subtitles_{timestamp}.csv'
        df.to_csv(csv_filename, index=False, encoding='utf-8')
        
        return df



In [29]:
# Ejemplo de uso
if __name__ == "__main__":
    API_KEY = 'AIzaSyDKckSI_0asveDz4lHuU_KMrmmU9zy0-18'
    yt = YoutubeChannelSubtitles(API_KEY)
    
    csv_file = yt.process_channel_to_csv(
        channel_name="@veritasium",
        max_videos=5,
        days_back=7
    )
    
    print(f"CSV generado: {csv_file}")



CSV generado: No videos found in the last 7 days for channel: @veritasium


In [25]:
csv_file

'No videos found in the last 7 days for channel: @veritasium'