<a href="https://colab.research.google.com/github/TheHidden1/yt-virality-prediction/blob/main/Youtube_Analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install google-api-python-client pandas numpy matplotlib seaborn scikit-learn textblob



In [2]:
import os
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import json
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.feature_extraction.text import TfidfVectorizer
from textblob import TextBlob
import warnings
warnings.filterwarnings('ignore')

In [None]:
class YouTubeViralityPredictor:
    def __init__(self, api_key):

        self.youtube = build('youtube', 'v3', developerKey=api_key)
        self.scaler = StandardScaler()
        self.tfidf_vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
        self.model = None
        self.features_columns = None

    def extract_video_id(self, url):
      #Vadenje na video id od linkot
        patterns = [
            r"(?:v=|\/)([0-9A-Za-z_-]{11}).*"
        ]
        for pattern in patterns:
            match = re.search(pattern, url)
            if match:
                return match.group(1)
        raise ValueError("Нема videoId во линкот.")

    def get_video_data(self, video_id):
        """Собирање податоци за видео преку YouTube API"""
        try:
            # Основни видео податоци
            video_request = self.youtube.videos().list(
                part='statistics,snippet,contentDetails',
                id=video_id
            )
            video_response = video_request.execute()

            if not video_response['items']:
                raise ValueError(f"Видео со ID {video_id} не е пронајден")

            video_item = video_response['items'][0]
            stats = video_item['statistics']
            snippet = video_item['snippet']
            content_details = video_item['contentDetails']

            # Податоци за каналот
            channel_request = self.youtube.channels().list(
                part='statistics,snippet',
                id=snippet['channelId']
            )
            channel_response = channel_request.execute()
            channel_stats = channel_response['items'][0]['statistics']

            # Коментари
            comments_data = self.get_comments_sample(video_id, max_results=500)

            return {
                'video_id': video_id,
                'title': snippet['title'],
                'description': snippet.get('description', ''),
                'tags': snippet.get('tags', []),
                'category_id': snippet['categoryId'],
                'published_at': snippet['publishedAt'],
                'duration': content_details['duration'],
                'views': int(stats.get('viewCount', 0)),
                'likes': int(stats.get('likeCount', 0)),
                'comments_count': int(stats.get('commentCount', 0)),
                'channel_subscriber_count': int(channel_stats.get('subscriberCount', 0)),
                'channel_video_count': int(channel_stats.get('videoCount', 0)),
                'channel_view_count': int(channel_stats.get('viewCount', 0)),
                'comments_sentiment': comments_data
            }

        except Exception as e:
            print(f"Грешка при собирање на податоци: {e}")
            return None

    def get_comments_sample(self, video_id, max_results=500):
        """Собирање на коментари за sentiment анализа овде дефаулт вредност е 500"""
        try:
            comments = []
            request = self.youtube.commentThreads().list(
                part='snippet',
                videoId=video_id,
                maxResults=max_results,
                order='relevance'
            )
            response = request.execute()

            for item in response['items']:
                comment_text = item['snippet']['topLevelComment']['snippet']['textOriginal']
                comments.append(comment_text)

            # Sentiment анализа
            if comments:
                sentiment_scores = [TextBlob(comment).sentiment.polarity for comment in comments]
                return {
                    'avg_sentiment': np.mean(sentiment_scores),
                    'sentiment_variance': np.var(sentiment_scores),
                    'positive_ratio': len([s for s in sentiment_scores if s > 0]) / len(sentiment_scores)
                }
            else:
                return {'avg_sentiment': 0, 'sentiment_variance': 0, 'positive_ratio': 0}

        except Exception:
            return {'avg_sentiment': 0, 'sentiment_variance': 0, 'positive_ratio': 0}

    def parse_duration(self, duration):
        """Конвертирање на duration во секунди"""
        try:
            import re
            pattern = r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?'
            match = re.match(pattern, duration)
            if match:
                hours = int(match.group(1) or 0)
                minutes = int(match.group(2) or 0)
                seconds = int(match.group(3) or 0)
                return hours * 3600 + minutes * 60 + seconds
            return 0
        except:
            return 0

    def calculate_days_since_publication(self, published_at):
        """Пресметка на денови од објавување"""
        try:
            pub_date = datetime.strptime(published_at, '%Y-%m-%dT%H:%M:%SZ')
            return (datetime.now() - pub_date).days
        except:
            return 0

    def extract_features(self, video_data):
        """Екстракција на features за моделот"""
        if not video_data:
            return None

        # Основни метрики
        views = video_data['views']
        likes = video_data['likes']
        comments = video_data['comments_count']
        duration_seconds = self.parse_duration(video_data['duration'])
        days_since_pub = self.calculate_days_since_publication(video_data['published_at'])

        # Engagement метрики [3]
        engagement_rate = ((likes + comments) / max(views, 1)) * 100 if views > 0 else 0
        like_to_view_ratio = (likes / max(views, 1)) * 100 if views > 0 else 0
        comment_to_view_ratio = (comments / max(views, 1)) * 100 if views > 0 else 0

        # Канал карактеристики [21]
        channel_authority = min(video_data['channel_subscriber_count'] / 1000000, 10)  # Нормализирано до 10
        channel_productivity = video_data['channel_video_count']

        # Содржински карактеристики [5]
        title_length = len(video_data['title'])
        description_length = len(video_data['description'])
        tags_count = len(video_data['tags'])

        # Временски фактори [4]
        pub_hour = datetime.strptime(video_data['published_at'], '%Y-%m-%dT%H:%M:%SZ').hour
        is_weekend = datetime.strptime(video_data['published_at'], '%Y-%m-%dT%H:%M:%SZ').weekday() >= 5

        # Sentiment карактеристики
        avg_sentiment = video_data['comments_sentiment']['avg_sentiment']
        sentiment_variance = video_data['comments_sentiment']['sentiment_variance']
        positive_ratio = video_data['comments_sentiment']['positive_ratio']

        # Виралност лабел (базирано на истражувања) [22]
        is_viral = 1 if views >= 1000000 and days_since_pub <= 7 else 0

        return {
            'views': views,
            'likes': likes,
            'comments': comments,
            'duration_minutes': duration_seconds / 60,
            'days_since_publication': days_since_pub,
            'engagement_rate': engagement_rate,
            'like_to_view_ratio': like_to_view_ratio,
            'comment_to_view_ratio': comment_to_view_ratio,
            'channel_authority': channel_authority,
            'channel_productivity': min(channel_productivity / 1000, 10),
            'title_length': title_length,
            'description_length': min(description_length / 1000, 10),
            'tags_count': tags_count,
            'pub_hour': pub_hour,
            'is_weekend': int(is_weekend),
            'avg_sentiment': avg_sentiment,
            'sentiment_variance': sentiment_variance,
            'positive_ratio': positive_ratio,
            'category_id': int(video_data['category_id']),
            'is_viral': is_viral
        }

    def collect_training_data(self, video_urls_file=None, trending_videos=True):
        """Собирање на податоци за тренирање на моделот"""
        training_data = []

        if trending_videos:
            # Собирање на trending видеа [2]
            print("Собирам trending видеа...")
            try:
                trending_request = self.youtube.videos().list(
                    part='id',
                    chart='mostPopular',
                    regionCode='US',
                    maxResults=50
                )
                trending_response = trending_request.execute()

                for item in trending_response['items']:
                    video_id = item['id']
                    print(f"Обработувам: {video_id}")
                    video_data = self.get_video_data(video_id)
                    if video_data:
                        features = self.extract_features(video_data)
                        if features:
                            training_data.append(features)

            except Exception as e:
                print(f"Грешка при собирање на trending видеа: {e}")

        if video_urls_file and os.path.exists(video_urls_file):
            # Собирање од фајл со URLs
            print("Собирам од фајл...")
            with open(video_urls_file, 'r') as f:
                urls = f.read().splitlines()

            for url in urls:
                try:
                    video_id = self.extract_video_id(url.strip())
                    print(f"Обработувам: {video_id}")
                    video_data = self.get_video_data(video_id)
                    if video_data:
                        features = self.extract_features(video_data)
                        if features:
                            training_data.append(features)
                except Exception as e:
                    print(f"Грешка за {url}: {e}")
                    continue

        return pd.DataFrame(training_data)

    def train_model(self, training_df):
        """Тренирање на модел за предвидување на виралност"""
        if training_df.empty:
            raise ValueError("Нема податоци за тренирање")

        # Подготовка на податоци [14]
        feature_columns = [col for col in training_df.columns if col != 'is_viral']
        X = training_df[feature_columns]
        y = training_df['is_viral']

        # Handle missing values
        X = X.fillna(0)

        # Скалирање на features
        X_scaled = self.scaler.fit_transform(X)

        # Поделба на податоци
        X_train, X_test, y_train, y_test = train_test_split(
            X_scaled, y, test_size=0.2, random_state=42, stratify=y
        )

        # Тренирање на повеќе модели [1]
        models = {
            'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
            'Gradient Boosting': GradientBoostingClassifier(random_state=42),
            'Logistic Regression': LogisticRegression(random_state=42)
        }

        best_model = None
        best_score = 0

        print("\nТренирам модели...")
        for name, model in models.items():
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            auc_score = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

            print(f"{name} - AUC Score: {auc_score:.3f}")

            if auc_score > best_score:
                best_score = auc_score
                best_model = model

        self.model = best_model
        self.features_columns = feature_columns

        print(f"\nНајдобар модел: {type(best_model).__name__} со AUC: {best_score:.3f}")

        # Feature importance [21]
        if hasattr(best_model, 'feature_importances_'):
            feature_importance = pd.DataFrame({
                'feature': feature_columns,
                'importance': best_model.feature_importances_
            }).sort_values('importance', ascending=False)

            print("\nТоп 10 најважни features:")
            print(feature_importance.head(10))

        return best_model, best_score

    def predict_virality(self, video_url):
        """Предвидување на виралност за конкретно видео"""
        if not self.model:
            raise ValueError("Моделот не е истрениран. Прво повикај train_model()")

        try:
            video_id = self.extract_video_id(video_url)
            video_data = self.get_video_data(video_id)

            if not video_data:
                return None

            features = self.extract_features(video_data)
            if not features:
                return None

            # Подготовка на features
            feature_df = pd.DataFrame([features])
            X = feature_df[self.features_columns].fillna(0)
            X_scaled = self.scaler.transform(X)

            # Предвидување
            virality_prob = self.model.predict_proba(X_scaled)[0][1]
            is_viral_pred = self.model.predict(X_scaled)[0]

            return {
                'video_id': video_id,
                'title': video_data['title'],
                'current_views': video_data['views'],
                'virality_probability': virality_prob,
                'predicted_viral': bool(is_viral_pred),
                'confidence': 'Висока' if virality_prob > 0.8 or virality_prob < 0.2 else 'Средна',
                'features': features
            }

        except Exception as e:
            print(f"Грешка при предвидување: {e}")
            return None

    def visualize_predictions(self, predictions_list):
        """Визуелизација на резултатите од предвидувањата"""
        if not predictions_list:
            return

        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # Веројатност за виралност
        probs = [p['virality_probability'] for p in predictions_list]
        axes[0, 0].hist(probs, bins=20, alpha=0.7, color='skyblue')
        axes[0, 0].set_title('Распределба на Веројатности за виралност')
        axes[0, 0].set_xlabel('Веројатност')
        axes[0, 0].set_ylabel('Број на видеа')

        # Engagement vs Virality Probability
        engagement_rates = [p['features']['engagement_rate'] for p in predictions_list]
        axes[0, 1].scatter(engagement_rates, probs, alpha=0.6)
        axes[0, 1].set_title('Engagement Rate vs Virality Probability')
        axes[0, 1].set_xlabel('Engagement Rate (%)')
        axes[0, 1].set_ylabel('Virality Probability')

        # Категории на предвидувања
        predictions = [p['predicted_viral'] for p in predictions_list]
        viral_counts = pd.Series(predictions).value_counts()

        # Динамички создај лабели базирано на постоечките податоци
        labels = []
        values = []

        # Провери кои категории постојат
        if True in viral_counts.index:
          labels.append('Viral')
          values.append(viral_counts[True])

        if False in viral_counts.index:
          labels.append('Non-Viral')
          values.append(viral_counts[False])

        # Создај pie chart со правилен број лабели
        axes[1, 0].pie(values, labels=labels, autopct='%1.1f%%')
        axes[1, 0].set_title('Предвидени категории')

        # Views vs Predicted Virality
        current_views = [p['current_views'] for p in predictions_list]
        colors = ['red' if p else 'blue' for p in predictions]
        axes[1, 1].scatter(current_views, probs, c=colors, alpha=0.6)
        axes[1, 1].set_title('Current Views vs Virality Prediction')
        axes[1, 1].set_xlabel('Current Views')
        axes[1, 1].set_ylabel('Virality Probability')
        axes[1, 1].set_xscale('log')

        plt.tight_layout()
        plt.show()

    def save_model(self, filename='virality_model.pkl'):
        """Зачувување на истрениран модел"""
        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'features_columns': self.features_columns
        }
        with open(filename, 'wb') as f:
            pickle.dump(model_data, f)
        print(f"Моделот е зачуван во {filename}")

    def load_model(self, filename='virality_model.pkl'):
        """Вчитување на претходно зачуван модел"""
        try:
            with open(filename, 'rb') as f:
                model_data = pickle.load(f)

            self.model = model_data['model']
            self.scaler = model_data['scaler']
            self.features_columns = model_data['features_columns']
            print(f"Моделот е успешно вчитан од {filename}")
            return True
        except Exception as e:
            print(f"Грешка при вчитување на модел: {e}")
            return False

def create_sample_urls_file():
    """Создавање на примерок фајл со YouTube URLs за тренирање"""
    sample_urls = [
        "https://www.youtube.com/watch?v=dQw4w9WgXcQ",  # Популарно видео
        "https://www.youtube.com/watch?v=kJQP7kiw5Fk",  # Луис Фонси - Despacito
        "https://www.youtube.com/watch?v=fJ9rUzIMcZQ",  # Bohemian Rhapsody
        # Додај повеќе URLs овде
    ]

    with open('sample_urls.txt', 'w') as f:
        for url in sample_urls:
            f.write(url + '\n')

    print("Создаден е примерок фајл: sample_urls.txt")

if __name__ == "__main__":
    print("=== YouTube AI Virality Predictor ===")

    api_key = input("Внесете го вашиот YouTube Data API клуч: ")
    # Иницијализација
    predictor = YouTubeViralityPredictor(api_key)
    predictions_history = []

    # Мени за избор
    print("\nИзберете опција:")
    print("1. Тренирај нов модел")
    print("2. Вчитај постоечки модел")
    print("3. Предвиди виралност за едно видео")

    choice = input("Внесете избор (1/2/3/4): ")

    if choice == "1":
        print("\nТренирање на нов модел...")

        # Можност за користење на сопствени URLs
        use_custom_urls = input("Дали сакате да користите сопствени URLs? (y/n): ").lower() == 'y'

        if use_custom_urls:
            create_sample_urls_file()
            print("Уредете го фајлот 'sample_urls.txt' со ваши URLs и притиснете Enter...")
            input()
            training_data = predictor.collect_training_data('sample_urls.txt', trending_videos=True)
        else:
            training_data = predictor.collect_training_data(trending_videos=True)

        if not training_data.empty:
            print(f"\nСобрани се {len(training_data)} примероци за тренирање")
            print(f"Вирални видеа: {training_data['is_viral'].sum()}")
            print(f"Не-вирални видеа: {len(training_data) - training_data['is_viral'].sum()}")

            # Тренирање
            model, score = predictor.train_model(training_data)

            # Зачувување
            predictor.save_model()

            # Зачувување на податоците
            training_data.to_csv('training_data.csv', index=False)
            print("Податоците се зачувани во training_data.csv")
        else:
            print("Нема собрани податоци за тренирање!")

    elif choice == "2":
        if predictor.load_model():
            print("Моделот е успешно вчитан!")
        else:
            print("Не можам да го вчитам моделот!")
            exit()
    else:
        print("Потребни се барем 2 предвидувања за визуелизација!")
        print(f"Моментално имате {len(predictions_history)} предвидувања.")

    if choice in ["2", "3"] or (choice == "1" and predictor.model is not None):
        # Предвидување за нови видеа
        while True:
            video_url = input("\nВнесете YouTube URL (или 'exit' за излез): ")
            if video_url.lower() == 'exit':
                break

            try:
                result = predictor.predict_virality(video_url)

                if result:
                    predictions_history.append(result)
                    print(f"\n=== РЕЗУЛТАТ ===")
                    print(f"Наслов: {result['title']}")
                    print(f"Моментални views: {result['current_views']:,}")
                    print(f"Веројатност за виралност: {result['virality_probability']:.1%}")
                    print(f"Предвидување: {'ВИРАЛНО' if result['predicted_viral'] else 'Обично'}")
                    print(f"Доверба: {result['confidence']}")

                    # Дополнителни insights
                    features = result['features']
                    print(f"\nКлучни метрики:")
                    print(f"• Engagement Rate: {features['engagement_rate']:.2f}%")
                    print(f"• Авторитет на канал: {features['channel_authority']:.1f}/10")
                    print(f"• Sentiment: {'Позитивен' if features['avg_sentiment'] > 0 else 'Негативен'}")
                    print(f"• Времетраење: {features['duration_minutes']:.1f} минути")
                    if(len(predictions_history)>=2):
                      show_viz = input("\nСакате ли да видите визуелизации сега? (y/n): ").strip().lower()
                      if show_viz == 'y':
                        try:
                          predictor.visualize_predictions(predictions_history)
                          print("Визуелизациите се прикажани!")
                        except Exception as e:
                          print(f"Грешка при создавање на визуелизации: {e}")
                else:
                    print("Не можам да анализирам ова видео!")

            except Exception as e:
                print(f"Грешка: {e}")