# Douyin Live Recorder

Notebook นี้ใช้สำหรับรัน DouyinLiveRecorder เพื่อบันทึกวิดีโอไลฟ์จาก Douyin (TikTok จีน)

In [None]:
# Clone repositories
!git clone https://github.com/ihmily/DouyinLiveRecorder.git
!git clone https://github.com/chaiz64/l777k.git

In [None]:
# Copy configuration files
!cp l777k/config.ini DouyinLiveRecorder/config/
!cp l777k/URL_config.ini DouyinLiveRecorder/config/

In [None]:
# เข้าไปยังโฟลเดอร์โปรเจค
%cd DouyinLiveRecorder

# ติดตั้ง dependencies
!pip3 install -r requirements.txt

In [None]:
# ติดตั้ง ffmpeg
!apt update
!apt install ffmpeg -y

In [None]:
# รันโปรแกรม
!python main.py

In [None]:
import time
import datetime

def keep_colab_alive(duration_hours=24):
    """
    Keeps the Google Colab runtime active by running a loop.

    Args:
        duration_hours (int): The duration in hours for which the script
                              should attempt to keep the runtime alive.
                              Default is 24 hours. Set to 0 for indefinite run
                              (requires manual stop).
    """
    print(f"Script will attempt to keep Colab alive for {duration_hours} hours. (Set to 0 for indefinite run)")
    start_time = datetime.datetime.now()
    end_time = start_time + datetime.timedelta(hours=duration_hours)

    counter = 0
    try:
        while True:
            current_time = datetime.datetime.now()
            if duration_hours > 0 and current_time >= end_time:
                print(f"Maximum duration of {duration_hours} hours reached. Stopping script.")
                break

            counter += 1
            print(f"[{current_time.strftime('%Y-%m-%d %H:%M:%S')}] Colab is alive! Iteration: {counter}")
            # Sleep for a short period to prevent excessive CPU usage
            time.sleep(60) # Sleep for 60 seconds (1 minute)

    except KeyboardInterrupt:
        print("\nScript stopped manually by user (Ctrl+C).")
    except Exception as e:
        print(f"\nAn error occurred: {e}")
    finally:
        print("Colab keep-alive script finished.")

# --- How to use ---
# Call the function to start keeping Colab alive.
# You can specify the duration in hours.
# For example, to keep it alive for 12 hours:
# keep_colab_alive(duration_hours=12)

# To run indefinitely until manually stopped (Ctrl+C):
keep_colab_alive(duration_hours=0) # Set to 0 for indefinite run


In [None]:
from google.colab import drive
import os
import shutil
import subprocess
from datetime import timedelta
import pandas as pd
import logging
from tqdm.notebook import tqdm # Use tqdm.notebook for Google Colab

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Configuration ---
# Source directory path
SOURCE_DIR = '/content/DouyinLiveRecorder/downloads/抖音直播'
# Target directory path (in Google Drive)
TARGET_DIR = '/content/drive/MyDrive/抖音直播备份' # Can be modified as needed
# --------------------

# Mount Google Drive
logging.info("Mounting Google Drive...")
try:
    drive.mount('/content/drive')
    logging.info("Google Drive mounted successfully.")
except Exception as e:
    logging.error(f"Failed to mount Google Drive: {e}")
    raise

# Check if source directory exists
if not os.path.exists(SOURCE_DIR):
    logging.error(f"Source directory not found: {SOURCE_DIR}")
    raise FileNotFoundError(f"Source directory not found: {SOURCE_DIR}")
logging.info(f"Source directory found: {SOURCE_DIR}")

# Create target directory (if it doesn't exist)
try:
    os.makedirs(TARGET_DIR, exist_ok=True)
    logging.info(f"Checked and created target directory: {TARGET_DIR}")
except Exception as e:
    logging.error(f"Failed to create target directory: {e}")
    raise

# Get video file duration (using ffprobe)
def get_video_duration(file_path):
    """
    Retrieves the duration of a video file using ffprobe.

    Args:
        file_path (str): The path to the video file.

    Returns:
        str: The duration in HH:MM:SS format, or "N/A" if an error occurs.
    """
    try:
        cmd = [
            'ffprobe', '-v', 'error', '-show_entries',
            'format=duration', '-of',
            'default=noprint_wrappers=1:nokey=1', file_path
        ]
        duration_str = subprocess.check_output(cmd, stderr=subprocess.PIPE).decode('utf-8').strip()
        duration = float(duration_str)
        return str(timedelta(seconds=duration))
    except (subprocess.CalledProcessError, ValueError) as e:
        logging.warning(f"Could not get duration for file {file_path}: {e}. Returning 'N/A'.")
        return "N/A"
    except FileNotFoundError:
        logging.error(" 'ffprobe' not found. Please ensure FFmpeg is installed.")
        return "N/A"

# Scan video files and return as DataFrame
def scan_video_files(directory):
    """
    Scans a directory for video files (.mp4, .ts) and collects their information.

    Args:
        directory (str): The directory to scan.

    Returns:
        pd.DataFrame: A DataFrame containing video file information.
    """
    logging.info(f"Scanning video files in: {directory}")
    video_files = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(('.mp4', '.ts')):
                file_path = os.path.join(root, file)
                try:
                    size = os.path.getsize(file_path) / (1024 * 1024) # MB
                    duration = get_video_duration(file_path)
                    video_files.append({
                        "File Name": file,
                        "Path": file_path,
                        "Size (MB)": round(size, 2),
                        "Duration": duration
                    })
                except OSError as e:
                    logging.warning(f"Could not access file {file_path}: {e}. Skipping this file.")
    logging.info(f"Scan complete. Found {len(video_files)} video files.")
    return pd.DataFrame(video_files)

# Scan and display file information
df = scan_video_files(SOURCE_DIR)
if len(df) > 0:
    print("\n📁 Found video files:")
    # Add an Index column for selection
    df_display = df.reset_index().rename(columns={'index': 'Index'})
    print(df_display[['Index', 'File Name', 'Size (MB)', 'Duration']].to_markdown(index=False))
else:
    print("❌ No MP4/TS video files found in the source directory.")
    logging.info("No MP4/TS video files found.")

# Select files to copy
copied_count = 0
skipped_count = 0
failed_count = 0

if len(df) > 0:
    print("\n🔄 Please select files to copy (enter Index numbers, comma-separated, or 'all' to copy all):")
    choice = input("Your choice: ").strip()

    selected_files_paths = []
    if choice.lower() == 'all':
        selected_files_paths = df['Path'].tolist()
    else:
        try:
            selected_indices = [int(i.strip()) for i in choice.split(',')]
            for idx in selected_indices:
                if 0 <= idx < len(df):
                    selected_files_paths.append(df.iloc[idx]['Path'])
                else:
                    print(f"⚠️ Invalid Index number {idx}. It will be skipped.")
                    logging.warning(f"User entered invalid Index: {idx}")
        except ValueError:
            print("❌ Invalid input. Please enter Index numbers or 'all'.")
            logging.error(f"User entered invalid input: {choice}")
            selected_files_paths = []

    if selected_files_paths:
        print(f"\nCopying {len(selected_files_paths)} files...")
        for src in tqdm(selected_files_paths, desc="Copying"):
            file_name = os.path.basename(src)
            dst = os.path.join(TARGET_DIR, file_name)

            if os.path.exists(dst):
                print(f"⏩ Skipping: '{file_name}' already exists in the target.")
                logging.info(f"Skipping existing file: {file_name}")
                skipped_count += 1
                continue

            try:
                shutil.copy2(src, dst)
                print(f"✅ Copied: '{file_name}'")
                logging.info(f"Successfully copied: {file_name}")
                copied_count += 1
            except Exception as e:
                print(f"❌ Failed to copy: '{file_name}' - {e}")
                logging.error(f"Failed to copy: {file_name} - {e}")
                failed_count += 1
    else:
        print("No files selected for copying.")
        logging.info("No files selected for copying.")

    # Option to delete source files
    if copied_count > 0:
        print("\n❓ Do you want to delete the successfully copied source files? (y/n)")
        delete_choice = input("Your choice: ").strip().lower()
        if delete_choice == 'y':
            deleted_count = 0
            print("\nDeleting source files...")
            for src in tqdm(selected_files_paths, desc="Deleting"):
                file_name = os.path.basename(src)
                # Check if the file was successfully copied before attempting to delete
                if os.path.exists(os.path.join(TARGET_DIR, file_name)):
                    try:
                        os.remove(src)
                        print(f"🗑️ Deleted: '{file_name}'")
                        logging.info(f"Successfully deleted source file: {file_name}")
                        deleted_count += 1
                    except Exception as e:
                        print(f"⚠️ Failed to delete: '{file_name}' - {e}")
                        logging.warning(f"Failed to delete source file: {file_name} - {e}")
                else:
                    logging.info(f"Skipping deletion of {file_name} as it was not found in target (might have been skipped during copy).")
            print(f"\n🗑️ Deleted {deleted_count} source files.")
            logging.info(f"Deletion summary: Deleted {deleted_count} files.")
        else:
            print("Source files were not deleted.")
            logging.info("User chose not to delete source files.")

# Summary Report
print("\n--- Summary Report ---")
print(f"✅ Successfully copied: {copied_count} files")
print(f"⏩ Skipped: {skipped_count} files")
print(f"❌ Failed: {failed_count} files")
logging.info(f"Operation Summary: Copied={copied_count}, Skipped={skipped_count}, Failed={failed_count}")

# Unmount Google Drive (optional)
logging.info("Unmounting Google Drive...")
try:
    drive.flush_and_unmount()
    logging.info("Google Drive unmounted successfully.")
except Exception as e:
    logging.error(f"Failed to unmount Google Drive: {e}")

print("\n🎉 Operation complete!")


In [None]:
import os
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import google.colab.files # Import the Google Colab files module
import time # For simulating processing time and managing batch downloads

# --- Setup and Initialization ---
# Path to your download folder
download_dir = "/content/DouyinLiveRecorder/downloads/抖音直播/"
download_limit_per_batch = 3 # Limit downloads to 3 files at a time

# List of common video file extensions to support
video_extensions = ('.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.ogg', '.3gp', '.ts')

# Create a temporary download folder (if necessary)
os.makedirs("download_temp", exist_ok=True)

# File name for saving downloaded URLs
download_links_file = "downloaded_links.txt"

# --- Helper Functions ---
def get_video_info(file_path):
    """
    Function to retrieve video information (size and duration).
    In a real scenario, you might use libraries like moviepy or ffprobe
    to extract this information from actual video files.
    For this example, we will simulate the data.
    """
    file_size = 0
    duration_seconds = 0
    file_name = os.path.basename(file_path)

    try:
        file_size = os.path.getsize(file_path) # File size in bytes
        # Simulate duration from file name, or use random/default values
        # If the file name is in the format video_X_duration_Ys.ext
        duration_match_str = next((s for s in file_name.split('_') if 's' in s and s.replace('s', '').isdigit()), None)
        if duration_match_str:
            duration_seconds = int(duration_match_str.replace('s', ''))
        else:
            # If no duration in file name, randomize for dummy files
            duration_seconds = 600 + (os.path.getsize(file_path) % 600) # At least 10 minutes (600 seconds)
    except FileNotFoundError:
        print(f"⚠️ File not found: {file_path}")
        return None

    return {
        "name": file_name,
        "path": file_path,
        "size_mb": round(file_size / (1024 * 1024), 2),
        "duration_min": round(duration_seconds / 60, 2),
        "raw_duration_sec": duration_seconds
    }

def create_dummy_files(base_dir, num_files=10):
    """Creates dummy files for testing if the actual folder is empty"""
    print(f"✨ Creating dummy files in {base_dir} for testing...")
    os.makedirs(base_dir, exist_ok=True)
    for i in range(1, num_files + 1):
        duration = 5 if i % 3 == 0 else (600 + (i * 30)) # Some files have duration less than 10 minutes
        # Use a different extension for some dummy files to test the new logic
        ext = video_extensions[i % len(video_extensions)]
        size = 1024 * 1024 * (i * 5) # Dummy file size (5MB, 10MB, ...)
        file_name = f"video_{i}_duration_{duration}s{ext}"
        file_path = os.path.join(base_dir, file_name)
        with open(file_path, "wb") as f:
            f.write(os.urandom(size)) # Write random data of specified size
    print(f"✅ Created {num_files} dummy files successfully.")

# --- Check and Collect Video File Information ---
video_files_info = []

if not os.path.exists(download_dir) or not os.listdir(download_dir):
    print(f"⚠️ Folder {download_dir} not found or is empty. Please check the path.")
    create_dummy_files(download_dir) # Create dummy files if no actual files

for root, _, files in os.walk(download_dir):
    for file in files:
        # Check if the file ends with any of the specified video extensions
        if file.lower().endswith(video_extensions):
            file_path = os.path.join(root, file)
            info = get_video_info(file_path)
            if info and info["raw_duration_sec"] >= 600: # Skip files with duration less than 10 minutes (600 seconds)
                video_files_info.append(info)

# Sort files by name
video_files_info.sort(key=lambda x: x['name'])

print(f"\n🎬 Found {len(video_files_info)} videos available for download (length >= 10 minutes):")
if not video_files_info:
    print("  ❌ No video files matching the criteria (length >= 10 minutes) in the specified folder.")
else:
    for video in video_files_info:
        print(f"  - {video['name']} | Size: {video['size_mb']} MB | Duration: {video['duration_min']} minutes")

# --- UI for Single File Download ---
print("\n--- ⬇️ Download Single File (Click button to download each file) ---")
output_single_file = widgets.Output()

def create_single_download_widget(file_info):
    """Creates a button and info for single file download"""
    file_label = widgets.HTML(
        value=f"<span>📁 <b>{file_info['name']}</b> | Size: {file_info['size_mb']} MB | Duration: {file_info['duration_min']} minutes</span>"
    )
    download_button = widgets.Button(description="⬇️ Download", button_style='info')

    # Define on_button_click before it's potentially used for removal
    def on_button_click(b):
        b.disabled = True # Disable button to prevent double clicks
        with output_single_file:
            clear_output(wait=True)
            try:
                # google.colab.files.download() will print "Downloading 'filename':"
                google.colab.files.download(file_info['path'])
                # Save URL to .txt file
                with open(download_links_file, "a") as f:
                    f.write(f"{file_info['path']}\n")
                print(f"✅ Download of {file_info['name']} initiated! Check your browser's download bar. (URL saved)")
            except Exception as e:
                print(f"❌ Error downloading {file_info['name']}: {e}")
            finally:
                b.disabled = False # Re-enable button after process

    # Remove old event handlers to prevent duplicate registrations,
    # which is a common cause of repeated execution when running cells multiple times.
    if hasattr(download_button, '_click_handlers'):
        download_button.on_click(on_button_click, remove=True) # Now on_button_click is defined

    download_button.on_click(on_button_click)
    return widgets.HBox([download_button, file_label])

if not video_files_info:
    display(HTML("<p>🚫 No video files available for single download.</p>"))
else:
    for video in video_files_info:
        display(create_single_download_widget(video))
    display(output_single_file)

# --- UI for Batch Download ---
print("\n--- 📦 Download in Order (Select files to skip, then click button) ---")
output_batch_download = widgets.Output()

# Checkboxes for selecting files to skip
skip_checkboxes = []
for video in video_files_info:
    checkbox = widgets.Checkbox(
        value=False, # Default value is not skipped (will be downloaded)
        description=f"Skip: {video['name']} | Size: {video['size_mb']} MB | Duration: {video['duration_min']} minutes",
        indent=False
    )
    skip_checkboxes.append(checkbox)

if not skip_checkboxes:
    display(HTML("<p>🚫 No video files available for batch download.</p>"))
else:
    skip_selection_box = widgets.VBox(children=skip_checkboxes)
    download_batch_button = widgets.Button(
        description=f"🚀 Start Download Selected Files ({download_limit_per_batch} files at a time)",
        button_style='success'
    )

    # Define on_download_batch_click before it's potentially used for removal
    def on_download_batch_click(b):
        b.disabled = True # Disable button to prevent multiple clicks during batch processing
        with output_batch_download:
            clear_output(wait=True) # Clear previous output
            files_to_download = []
            for i, checkbox in enumerate(skip_checkboxes):
                if not checkbox.value: # If not selected to skip (checkbox.value is False)
                    files_to_download.append(video_files_info[i])

            if not files_to_download:
                print("🚫 No files to download. Please select at least one file or uncheck skip options.")
                b.disabled = False # Re-enable if no files selected
                return

            print(f"📦 Preparing {len(files_to_download)} selected files for download...")

            downloaded_count = 0
            try:
                for file_info in files_to_download:
                    if downloaded_count >= download_limit_per_batch:
                        print(f"\n⚠️ Stop! Reached the download limit of {download_limit_per_batch} files per batch.")
                        print("Please finish downloading the ready files, then click the '🚀 Start Download Selected Files' button again to download the remaining files.")
                        break

                    # google.colab.files.download() will print "Downloading 'filename':"
                    try:
                        google.colab.files.download(file_info['path'])
                        # Save URL to .txt file
                        with open(download_links_file, "a") as f:
                            f.write(f"{file_info['path']}\n")
                        print(f"✅ Download of {file_info['name']} initiated! Check your browser's download bar. (URL saved)")
                    except Exception as e:
                        print(f"❌ Error downloading {file_info['name']}: {e}")

                    downloaded_count += 1
                    time.sleep(1) # Add a slightly longer delay between downloads to prevent issues with multiple concurrent downloads

                if downloaded_count < len(files_to_download) and downloaded_count < download_limit_per_batch:
                     print("\n🎉 File preparation for download completed!")
                elif downloaded_count == len(files_to_download):
                    print("\n🎉 All selected files have been prepared for download!")
            finally:
                b.disabled = False # Re-enable button after batch process completes or breaks


    # Remove old event handlers to prevent duplicate registrations
    if hasattr(download_batch_button, '_click_handlers'):
        download_batch_button.on_click(on_download_batch_click, remove=True) # Now on_download_batch_click is defined

    download_batch_button.on_click(on_download_batch_click)

    display(skip_selection_box)
    display(download_batch_button, output_batch_download)

print("\n--- End of Setup ---")
print("👆 You can scroll up to the 'Download Single File' and 'Download in Order' sections to use them!")