In [None]:
from moviepy.editor import VideoFileClip
import pandas as pd
import re
import holidays
from textblob import TextBlob
import pytesseract
import os
import cv2
from scenedetect import SceneManager, open_video
from scenedetect.detectors import ContentDetector
from PIL import Image
import torch

In [None]:
humor_analyzer = pipeline("text-classification", model="Hate-speech-CNERG/dehatebert-mono-english", device=0)  # Replace with a suitable model
whisper_model = whisper.load_model("medium")  # Load Whisper model for audio transcription
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

In [None]:

class TikTokVideoAnalytics:
    def __init__(self, video_path, sound_name, post_timestamp, description, total_interactions, follower_count):
        self.video_path = video_path
        self.sound_name = sound_name
        self.post_timestamp = post_timestamp
        self.description = description
        self.total_interactions = total_interactions
        self.follower_count = follower_count
        self.trending_sounds_list = ['Sound1', 'Sound2', 'Sound_ID_or_Name']  # Update with actual trending sounds
        self.humor_analyzer = humor_analyzer
        self.whisper_model = whisper_model
        self.processor = processor
        self.model = model

    def get_video_length(self):
        """Extracts the duration of the video in seconds."""
        clip = VideoFileClip(self.video_path)
        duration = clip.duration
        clip.close()
        return duration

    def is_trending_sound(self):
        """Checks if a trending sound is used in the video."""
        return self.sound_name in self.trending_sounds_list

    def extract_post_time(self):
        """
        Extracts the day, hour, month, season of posting, checks if the next day is a holiday in the U.S., and checks for long weekends.
        """
        try:
            # Convert to pandas datetime object
            if isinstance(self.post_timestamp, (int, str)) and str(self.post_timestamp).isdigit():
                post_time = pd.to_datetime(int(self.post_timestamp), unit='s')
            else:
                post_time = pd.to_datetime(self.post_timestamp)
        except Exception as e:
            print(f"Error parsing timestamp: {e}")
            return None

        day_of_week = post_time.strftime('%A')
        hour = post_time.hour
        month = post_time.strftime('%B')
        month_to_season = {
            'January': 'Winter', 'February': 'Winter', 'March': 'Spring',
            'April': 'Spring', 'May': 'Spring', 'June': 'Summer',
            'July': 'Summer', 'August': 'Summer', 'September': 'Fall',
            'October': 'Fall', 'November': 'Fall', 'December': 'Winter'
        }
        season = month_to_season.get(month, 'Unknown')
        us_holidays = holidays.UnitedStates(years=post_time.year)
        next_day = post_time + pd.Timedelta(days=1)
        next_day_holiday = next_day in us_holidays
        next_day_holiday_name = us_holidays.get(next_day) if next_day_holiday else 'None'
        is_long_weekend = day_of_week in ['Friday', 'Saturday', 'Sunday', 'Thursday'] and next_day_holiday

        return {
            'Day': day_of_week,
            'Hour': hour,
            'Month': month,
            'Season': season,
            'Next_Day_Holiday': next_day_holiday,
            'Next_Day_Holiday_Name': next_day_holiday_name,
            'Is_Long_Weekend': is_long_weekend
        }

    def count_hashtags(self):
        """Counts the number of hashtags in the video description."""
        hashtags = re.findall(r'#\w+', self.description)
        return len(hashtags)

    def calculate_engagement_rate(self):
        """Calculates the engagement rate as (total interactions / follower count) * 100."""
        return (self.total_interactions / self.follower_count) * 100

    def analyze_sentiment(self):
        """Analyzes the sentiment score of the video description."""
        blob = TextBlob(self.description)
        return blob.sentiment.polarity

    def is_collaboration(self):
        """Identifies if the video is a collaboration or duet based on keywords."""
        keywords = ['duet', 'collaboration', 'with']
        return any(keyword in self.description.lower() for keyword in keywords)

    def identify_collaboration_type(self):
        """Identifies the type of collaboration or duet based on keywords."""
        collab_types = {
            'Duet': ['duet'],
            'Collaboration': ['collaboration', 'collab'],
            'With': ['with']
        }
        description_lower = self.description.lower()
        for collab_type, keywords in collab_types.items():
            if any(keyword in description_lower for keyword in keywords):
                return collab_type
        return 'None'

    def is_series(self):
        """Identifies if the video is part of a series based on general keywords."""
        series_keywords = ['part', 'episode', 'series', 'sequel', 'continuation']
        return any(keyword in self.description.lower() for keyword in series_keywords)

    def identify_series_type(self):
        """Identifies the type of series based on keywords."""
        series_types = {
            'Part Series': ['part 1', 'part 2', 'part 3', 'episode'],
            'Sequel': ['sequel', 'continuation', 'follow-up'],
            'Multi-Part': ['multi-part', 'series']
        }
        description_lower = self.description.lower()
        for series_type, keywords in series_types.items():
            if any(keyword in description_lower for keyword in keywords):
                return series_type
        return 'None'

    def extract_audio_text(self):
        """Extracts audio from the video and converts it to text using Whisper."""
        clip = VideoFileClip(self.video_path)
        audio_path = "/notebooks/hackathon/temp_audio.wav"
        clip.audio.write_audiofile(audio_path, codec='pcm_s16le')
        clip.close()
        result = self.whisper_model.transcribe(audio_path)
        return result['text']

    def identify_witty_type(self, text):
        """Analyzes the text to determine the type of wittiness using NLP models."""
        witty_types = {
            'Humor': ['funny', 'joke', 'hilarious', 'comedy'],
            'Sarcasm': ['sarcasm', 'irony', 'sarcastic', 'mocking'],
            'Clever': ['clever', 'smart', 'witty', 'sharp']
        }
        text_lower = text.lower()
        for witty_type, keywords in witty_types.items():
            if any(keyword in text_lower for keyword in keywords):
                return witty_type
        analysis = self.humor_analyzer(text)
        for result in analysis:
            if result['label'] in ['Humor', 'Witty', 'Sarcasm']:
                return result['label']
        return 'None'

    def check_wittiness(self, audio_text):
        witty_type = self.identify_witty_type(audio_text)
        return witty_type

    def extract_scene_key_frames(self, output_folder):
        # Create a scene manager and add a content detector
        video = open_video(self.video_path)
        scene_manager = SceneManager()
        scene_manager.add_detector(ContentDetector(threshold=30))

        # Perform scene detection
        scene_manager.detect_scenes(video)
        scene_list = scene_manager.get_scene_list()

        #print(f"Detected {len(scene_list)} scenes.")

        # Create output folder if it doesn't exist
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        # Open the video using OpenCV
        cap = cv2.VideoCapture(self.video_path)

        # List to store paths of saved key frames
        key_frames = []

        # Extract and save key frames at the start of each detected scene
        for i, scene in enumerate(scene_list):
            start_frame = scene[0].get_frames()
            cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
            ret, frame = cap.read()

            if ret:
                frame_filename = os.path.join(output_folder, f"scene_key_frame_{i+1}.jpg")
                cv2.imwrite(frame_filename, frame)
                key_frames.append(frame_filename)
                #print(f"Saved: {frame_filename}")

        
        cap.release()
        
        return key_frames

    def extract_text_from_key_frames(self, frames):
        """
        Extracts text descriptions from key frames using the BLIP model.

        Parameters:
        - key_frames (list): List of file paths of the key frames.

        Returns:
        - captions (list): List of extracted text descriptions from the key frames.
        """
        captions = []
        for i, frame_path in enumerate(frames):
            image = Image.open(frame_path).convert('RGB')
            inputs = self.processor(images=image, return_tensors="pt")

            with torch.no_grad():
                output = self.model.generate(**inputs)
            caption = self.processor.decode(output[0], skip_special_tokens=True)
            captions.append(f"{caption}")
            #print(f"Caption for Frame {i+1}: {caption}")

        return captions

    def run_pipeline(self):
        """Runs the complete pipeline to extract all features including specific types."""
        # Extract day, hour, month, season, and holiday information
        post_time_features = self.extract_post_time()

        # Extract key frames from the video
        #key_frames = self.extract_scene_key_frames("/notebooks/hackathon/dummy")

        # Extract text descriptions from the key frames using BLIP
        #captions = self.extract_text_from_key_frames(key_frames)

        # Extract transcribed text from the video using Whisper
        transcribed_text = self.extract_audio_text()

        # Compile all features
        features = {
            'Video_Length': self.get_video_length(),
            'Trending_Sounds': self.is_trending_sound(),
            'Post_Day': post_time_features['Day'],
            'Post_Hour': post_time_features['Hour'],
            'Post_Month': post_time_features['Month'],
            'Post_Season': post_time_features['Season'],
            'Next_Day_Holiday': post_time_features['Next_Day_Holiday'],
            'Next_Day_Holiday_Name': post_time_features['Next_Day_Holiday_Name'],
            'Is_Long_Weekend': post_time_features['Is_Long_Weekend'],
            'Hashtags': self.count_hashtags(),
            'Engagement_Rate': self.calculate_engagement_rate(),
            'Sentiment': self.analyze_sentiment(),
            'Collaborations': self.is_collaboration(),
            'Collaboration_Type': self.identify_collaboration_type(),
            'Series': self.is_series(),
            'Series_Type': self.identify_series_type(),
            'Witty': self.check_wittiness(transcribed_text),
            #'Key_Frames': key_frames,  # List of paths to extracted key frames
            #'Key_Frame_Captions': captions,  # Captions generated from the key frames
            #+'Transcribed_Text': transcribed_text  # Text extracted from the video's audio
        }
        return features
