In [None]:
!pip install youtube-transcript-api

In [None]:
%pip install pysentimiento
pip install vader-multi
!pip install transformers

# RoBERTuito

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from pysentimiento import create_analyzer
analyzer = create_analyzer(task="sentiment", lang="es")

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])

analyzer.predict(combined_text)

# VADER

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])

analyzer = SentimentIntensityAnalyzer()
analyzer.polarity_scores(combined_text)

In [None]:
#For larger captions we analyze it through chunks
from youtube_transcript_api import YouTubeTranscriptApi
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])

# Initialize Sentiment Analyzer
analyzer = SentimentIntensityAnalyzer()

# Define function to analyze sentiment of each chunk
def analyze_sentiment(text_chunk):
    return analyzer.polarity_scores(text_chunk)

# Define chunk size (number of characters per chunk)
chunk_size = 1000

# Split the combined text into chunks
text_chunks = [combined_text[i:i+chunk_size] for i in range(0, len(combined_text), chunk_size)]

# Analyze sentiment for each chunk
sentiment_scores = [analyze_sentiment(chunk) for chunk in text_chunks]

# Aggregate sentiment scores
aggregate_scores = {
    'positive': sum(score['pos'] for score in sentiment_scores) / len(sentiment_scores),
    'negative': sum(score['neg'] for score in sentiment_scores) / len(sentiment_scores),
    'neutral': sum(score['neu'] for score in sentiment_scores) / len(sentiment_scores),
    'compound': sum(score['compound'] for score in sentiment_scores) / len(sentiment_scores)
}

# Print overall sentiment scores
print("Overall Sentiment Scores:")
print("Positive:", aggregate_scores['positive'])
print("Negative:", aggregate_scores['negative'])
print("Neutral:", aggregate_scores['neutral'])
print("Compound:", aggregate_scores['compound'])

# MSPM

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from transformers import pipeline
# Create the sentiment classification pipeline
classifier = pipeline("text-classification", "clampert/multilingual-sentiment-covid19")

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])

# Analysis
result1 = classifier(combined_text)

# Get the scores for negative and positive sentiments
negative_score = None
positive_score = None

for res in result1:
    if res['label'] == 'negative':
        negative_score = res['score']
    elif res['label'] == 'positive':
        positive_score = res['score']

# If any score is None, assign it as the complement of the known score
if negative_score is None:
    negative_score = 1 - positive_score
if positive_score is None:
    positive_score = 1 - negative_score

# Print the results
print([{'label': 'negative', 'score': negative_score}, {'label': 'positive', 'score': positive_score}])

In [None]:
#For larger captions we analyze it through chunks
from youtube_transcript_api import YouTubeTranscriptApi
from transformers import pipeline
import numpy as np

# Create the sentiment classification pipeline
classifier = pipeline("text-classification", "clampert/multilingual-sentiment-covid19")

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])
print("Combined Text:\n", combined_text, "\n")

# Function to split text into chunks of a specified max length
def chunk_text(text, max_length=512):
    words = text.split()
    chunks = []
    current_chunk = []
    current_length = 0
    for word in words:
        current_length += len(word) + 1  # +1 for the space
        if current_length > max_length:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_length = len(word) + 1
        current_chunk.append(word)
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

# Split the combined text into chunks
chunks = chunk_text(combined_text)

# Classify sentiment for each chunk
negative_scores = []
positive_scores = []

for i, chunk in enumerate(chunks):
    result = classifier(chunk)
    chunk_negative_score = None
    chunk_positive_score = None

    for res in result:
        if res['label'] == 'negative':
            chunk_negative_score = res['score']
        elif res['label'] == 'positive':
            chunk_positive_score = res['score']

    # If any score is None, assign it as the complement of the known score
    if chunk_negative_score is None:
        chunk_negative_score = 1 - chunk_positive_score if chunk_positive_score is not None else 0
    if chunk_positive_score is None:
        chunk_positive_score = 1 - chunk_negative_score if chunk_negative_score is not None else 0

    negative_scores.append(chunk_negative_score)
    positive_scores.append(chunk_positive_score)

    # Print results for each chunk
    print(f"Chunk {i + 1}:")
    print(chunk)
    print([
        {'label': 'negative', 'score': chunk_negative_score},
        {'label': 'positive', 'score': chunk_positive_score}
    ])
    print()

# Calculate average scores
average_negative_score = np.mean(negative_scores) if negative_scores else 0
average_positive_score = np.mean(positive_scores) if positive_scores else 0

# Print the average results
print("Average Results:")
print([
    {'label': 'negative', 'score': average_negative_score},
    {'label': 'positive', 'score': average_positive_score}
])


# Google Perspective API

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
from googleapiclient import discovery

# Your API key for the Perspective API
API_KEY = 'API_KEY'

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])
print(combined_text)

# Initialize the Perspective API client
client = discovery.build(
    "commentanalyzer",
    "v1alpha1",
    developerKey=API_KEY,
    discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
    static_discovery=False,
)

# Function to analyze text and return results
def analyze_text(text):
    try:
        analyze_request = {
            'comment': {'text': text},
            'requestedAttributes': {
                'TOXICITY': {},
                'IDENTITY_ATTACK': {},
                'INSULT': {},
                'PROFANITY': {},
                'THREAT': {}
            }
        }
        response = client.comments().analyze(body=analyze_request).execute()
        return response
    except Exception as e:
        error_details = e.error_details[0]
        if 'LANGUAGE_NOT_SUPPORTED_BY_ATTRIBUTE' in error_details:
            print(f"Skipping analysis for text due to unsupported language: {text}")
        elif 'LANGUAGE_NOT_SUPPORTED' in error_details:
            print(f"Skipping analysis for text due to undefined language: {text}")
        else:
            print(f"Skipping analysis due to error: {error_details}")
        # Return default values of 0 for each category
        return {
            'attributeScores': {
                'TOXICITY': {'summaryScore': {'value': 0}},
                'IDENTITY_ATTACK': {'summaryScore': {'value': 0}},
                'INSULT': {'summaryScore': {'value': 0}},
                'PROFANITY': {'summaryScore': {'value': 0}},
                'THREAT': {'summaryScore': {'value': 0}}
            }
        }

# Analyze the combined text
response = analyze_text(combined_text)

# Print the analysis results
result = {}
for attribute, scores in response['attributeScores'].items():
    score_value = scores['summaryScore']['value']
    result[attribute] = score_value
    # Extract spanScores values if present
    if 'spanScores' in scores:
        for score in scores['spanScores']:
            score_type = score['score']['type']
            score_value = score['score']['value']
            result[f'{attribute}_{score_type}'] = score_value

print(result)

In [None]:
#For larger captions we analyze it through chunks
from youtube_transcript_api import YouTubeTranscriptApi
from googleapiclient import discovery

# Your API key for the Perspective API
API_KEY = 'API_KEY'

# Get the transcript
srt = YouTubeTranscriptApi.get_transcript("videoID", languages=['es'])

# Combine the text from the transcript segments
combined_text = " ".join([entry['text'] for entry in srt])
print(combined_text)

# Initialize the Perspective API client
client = discovery.build(
    "commentanalyzer",
    "v1alpha1",
    developerKey=API_KEY,
    discoveryServiceUrl="https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1",
    static_discovery=False,
)

# Function to analyze text and return results
def analyze_text(text):
    try:
        analyze_request = {
            'comment': {'text': text},
            'requestedAttributes': {
                'TOXICITY': {},
                'IDENTITY_ATTACK': {},
                'INSULT': {},
                'PROFANITY': {},
                'THREAT': {}
            }
        }
        response = client.comments().analyze(body=analyze_request).execute()
        return response
    except Exception as e:
        error_details = e.error_details[0]
        if 'LANGUAGE_NOT_SUPPORTED_BY_ATTRIBUTE' in error_details:
            print(f"Skipping analysis for text due to unsupported language: {text}")
        elif 'LANGUAGE_NOT_SUPPORTED' in error_details:
            print(f"Skipping analysis for text due to undefined language: {text}")
        else:
            print(f"Skipping analysis due to error: {error_details}")
        # Return default values of 0 for each category
        return {
            'attributeScores': {
                'TOXICITY': {'summaryScore': {'value': 0}},
                'IDENTITY_ATTACK': {'summaryScore': {'value': 0}},
                'INSULT': {'summaryScore': {'value': 0}},
                'PROFANITY': {'summaryScore': {'value': 0}},
                'THREAT': {'summaryScore': {'value': 0}}
            }
        }

# Function to split text into chunks
def split_text(text, max_length):
    words = text.split()
    chunks = []
    chunk = []
    chunk_length = 0
    for word in words:
        if chunk_length + len(word) + 1 > max_length:
            chunks.append(' '.join(chunk))
            chunk = []
            chunk_length = 0
        chunk.append(word)
        chunk_length += len(word) + 1
    if chunk:
        chunks.append(' '.join(chunk))
    return chunks

# Analyze the combined text in chunks
max_chunk_length = 5000  # Maximum length of text for Perspective API
text_chunks = split_text(combined_text, max_chunk_length)

# Store the results of each chunk
chunk_results = []

for chunk in text_chunks:
    response = analyze_text(chunk)
    chunk_results.append(response)

# Aggregate the results
def aggregate_results(chunk_results):
    aggregated_result = {}
    attribute_sums = {
        'TOXICITY': 0,
        'IDENTITY_ATTACK': 0,
        'INSULT': 0,
        'PROFANITY': 0,
        'THREAT': 0
    }
    count = len(chunk_results)
    for response in chunk_results:
        for attribute, scores in response['attributeScores'].items():
            attribute_sums[attribute] += scores['summaryScore']['value']
    for attribute, total in attribute_sums.items():
        aggregated_result[attribute] = total / count
    return aggregated_result

result = aggregate_results(chunk_results)

# Print the aggregated results
print(result)
