<a href="https://colab.research.google.com/github/ImNotDanish05/2025_PBO_TI-1A/blob/main/Jobsheet_Tugas_Besar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🔥 Program Penghapus Komentar Judol (Permanent Delete)

🎯 **Tujuan Program:**  
Program ini dibuat secara khusus untuk **menghapus komentar Judol secara permanen** dari video YouTube yang terhubung melalui API.  
Tidak hanya menyembunyikan komentar, tapi langsung **menghapus dari database YouTube**, tanpa ampun! 😤💢

📌 **Catatan Penting:**
- Gunakan akun yang memiliki izin pemilik channel (owner) untuk mengakses komentar.
- Program ini **tidak dapat membatalkan penghapusan komentar**, jadi pastikan semua tindakan sudah dipertimbangkan dengan matang sebelum menjalankannya.
- Program ini **menggunakan OAuth 2.0** untuk autentikasi akun Google.
- Harus sudah login sebelum bisa menjalankan proses penghapusan.

🛠️ **Fitur Utama:**
- Menampilkan daftar komentar dari video tertentu.
- Memfilter komentar yang termasuk kategori Judol.
- Menghapus komentar Judol secara PERMANEN (bukan sekadar hidden).
- Log hasil penghapusan.

❤️ Dibuat dengan sepenuh hati oleh ImNotDanish05

---

⚠️ **PERINGATAN!**  
Komentar yang sudah dihapus **tidak dapat dikembalikan**. Pastikan untuk mengecek komentar sebelum menghapusnya.


In [20]:
%%writefile config.py
import os
# @title Config
BASE_DIR = "/content/drive/MyDrive/Danish05/File Belajar/Coding Fun/Data/Pemrograman Berbasis Objek/Jobsheet Tugas Besar/database" # @param {"type": "string"}
NAMA_DB = 'DB_Youtube.db' # @param {"type": "string"}
DB_PATH = os.path.join(BASE_DIR, NAMA_DB)
LOGIN_SESSION = "login_session.csv" # @param {"type": "string"}

# Lokasi file
CREDENTIALS_PATH = '/content/drive/MyDrive/Danish05/Account/OAuth2/credentials_ImNotDanish05API.json' # @param {"type": "string"}
NGROK_TOKEN_PATH = '/content/drive/MyDrive/Danish05/Account/ngrok.txt' # @param {"type": "string"}

# Scope yang dibutuhkan
SCOPES = [
    "https://www.googleapis.com/auth/youtube.force-ssl",
    "openid",
    "https://www.googleapis.com/auth/userinfo.email",
    "https://www.googleapis.com/auth/userinfo.profile"
]

# Loggin Session
TIME_SESSION = 60# @param {"type": "integer", "min":1, "max": 1440}

# Setting
DB_COMMENT_DELETEAFTERFINISHEXECUTE = False # @param {"type": "boolean"}

Overwriting config.py


# **The Program (DO NOT EDIT)**

Only edit if you UNDERSTAND what are you doing

In [5]:
%%writefile setup_db.py
import sqlite3
import os
from config import DB_PATH

def create_database():
    # Ensure the directory exists
    print(f"Creating database at {DB_PATH}")
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    conn = None
    try:
        # Connect to the SQLite database (it will be created if it doesn't exist)
        conn = sqlite3.connect(DB_PATH)

        # Create a cursor object
        cursor = conn.cursor()

        # Create a table if it doesn't exist
        sql_code = """
        CREATE TABLE IF NOT EXISTS user (
            id INTEGER PRIMARY KEY AUTOINCREMENT,

            google_id TEXT NOT NULL UNIQUE,
            email TEXT NOT NULL UNIQUE,
            name TEXT DEFAULT 'NotName',
            picture TEXT DEFAULT 'https://placehold.co/400x400?text=?',

            token TEXT NOT NULL,
            refresh_token TEXT NOT NULL,
            token_uri TEXT NOT NULL,
            client_id TEXT NOT NULL,
            client_secret TEXT NOT NULL,
            scopes TEXT NOT NULL,
            expiry TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS channel (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            channel_id TEXT NOT NULL UNIQUE,
            title TEXT DEFAULT 'Unknown',
            subscribers INTEGER DEFAULT 0,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP,
            is_active INTEGER DEFAULT 1,

            FOREIGN KEY(user_id) REFERENCES user(id)
        );

        CREATE TABLE IF NOT EXISTS video (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            channel_id TEXT NOT NULL,
            video_id TEXT NOT NULL UNIQUE,
            title TEXT DEFAULT 'Unknown',
            description TEXT,
            published_at TEXT,
            comment_count INTEGER DEFAULT 0,
            FOREIGN KEY(channel_id) REFERENCES channel(channel_id)
        );

        CREATE TABLE IF NOT EXISTS comment (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            video_id TEXT NOT NULL,
            comment_id TEXT NOT NULL UNIQUE,
            author TEXT,
            comment TEXT NOT NULL,
            published_at TEXT,
            FOREIGN KEY(video_id) REFERENCES video(video_id)
        );

        """

        cursor.executescript(sql_code)
        # Commit the changes and close the connection
        conn.commit()
        print("Database and tables created successfully.")
        conn.close()
        return True
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        if conn:
            conn.close()
        return False
    finally:
        if conn:
            conn.close()
            print("Connection closed.")

if __name__ == "__main__":
    # Run the function to create the database
    if create_database():
        print("✅ Database setup complete!")
    else:
        print("❌ Database setup failed.")

Writing setup_db.py


In [6]:
%%writefile database.py
import sqlite3
import pandas as pd
from config import DB_PATH

# Mencari Koneksi ke database SQLite
# Jika tidak ada, akan mengembalikan None
def get_db_connection() -> sqlite3.Connection | None:
    """Create a database connection to the SQLite database specified by DB_PATH."""
    try:
        conn = sqlite3.connect(
            DB_PATH,
            detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
            timeout=10  # Wait up to 10 seconds for the database lock to clear
        )
        conn.row_factory = sqlite3.Row  # Supaya akses kolom pakai nama
        return conn
    except sqlite3.Error as e:
        print(f"ERROR [database.py] Koneksi DB gagal: {e}")
        return None

# Eksekusi seperti CRUD: INSERT, UPDATE, DELETE
def execute_query(query: str, params: tuple = ()) -> bool:
    """Execute a single SQL query."""
    conn = get_db_connection()
    if conn is None:
        return False
    try:
        cursor = conn.cursor()
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        conn.commit()
        if query.strip().upper().startswith("INSERT"):
            return cursor.lastrowid
        return True
    except sqlite3.Error as e:
        print(f"ERROR [database.py] Gagal eksekusi query: {e}")
        conn.rollback()
        return False
    finally:
        if conn:
            conn.close()

# Eksekusi query yang mengembalikan data, seperti SELECT
def fetch_query(query: str, params: tuple = None, fetch_all: bool = True):
    """
    Menjalankan query SELECT dan mengembalikan hasil:
    - fetch_all=True: list of rows
    - fetch_all=False: single row
    """
    conn = get_db_connection()
    if not conn:
        return None

    try:
        cursor = conn.cursor()
        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)
        result = cursor.fetchall() if fetch_all else cursor.fetchone()
        return result
    except sqlite3.Error as e:
        print(f"ERROR [database.py] Fetch gagal: {e} | Query: {query[:60]}")
        return None
    finally:
        if conn:
            conn.close()

# Eksekusi query SELECT dan mengembalikan hasil sebagai DataFrame Pandas
def get_dataframe(query: str, params: tuple = None) -> pd.DataFrame:
    """Menjalankan query SELECT dan mengembalikan hasil sebagai DataFrame Pandas."""
    conn = get_db_connection()
    if not conn:
        return pd.DataFrame()

    try:
        df = pd.read_sql_query(query, conn, params=params)
        return df
    except Exception as e:
        print(f"ERROR [database.py] Gagal baca ke DataFrame: {e}")
        return pd.DataFrame()
    finally:
        if conn:
            conn.close()

# Setup awal database, bikin tabel jika belum ada
# Fungsi ini menjadi auto setup saat pertama kali dijalankan
def setup_database_initial():
    print(f"Memeriksa/membuat tabel di database (via database.py): {DB_PATH}")
    conn = get_db_connection()
    if not conn:
        return False

    try:
        cursor = conn.cursor()
        sql_create = """
        CREATE TABLE IF NOT EXISTS user (
            id INTEGER PRIMARY KEY AUTOINCREMENT,

            google_id TEXT NOT NULL UNIQUE,
            email TEXT NOT NULL UNIQUE,
            name TEXT DEFAULT 'NotName',
            picture TEXT DEFAULT 'https://placehold.co/400x400?text=?',

            token TEXT NOT NULL,
            refresh_token TEXT NOT NULL,
            token_uri TEXT NOT NULL,
            client_id TEXT NOT NULL,
            client_secret TEXT NOT NULL,
            scopes TEXT NOT NULL,
            expiry TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS channel (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            channel_id TEXT NOT NULL UNIQUE,
            title TEXT DEFAULT 'Unknown',
            subscribers INTEGER DEFAULT 0,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP,
            is_active INTEGER DEFAULT 1,

            FOREIGN KEY(user_id) REFERENCES user(id)
        );

        CREATE TABLE IF NOT EXISTS video (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            channel_id TEXT NOT NULL,
            video_id TEXT NOT NULL UNIQUE,
            title TEXT DEFAULT 'Unknown',
            description TEXT,
            published_at TEXT,
            comment_count INTEGER DEFAULT 0,
            FOREIGN KEY(channel_id) REFERENCES channel(channel_id)
        );

        CREATE TABLE IF NOT EXISTS comment (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            video_id TEXT NOT NULL,
            comment_id TEXT NOT NULL UNIQUE,
            author TEXT,
            comment TEXT NOT NULL,
            published_at TEXT,
            FOREIGN KEY(video_id) REFERENCES video(video_id)
        );

        """
        cursor.executescript(sql_create)
        conn.commit()
        print("-> Semua tabel siap.")
        return True
    except sqlite3.Error as e:
        print(f"Error SQLite saat setup tabel: {e}")
        return False
    finally:
        if conn:
            conn.close()


Writing database.py


In [7]:
%%writefile model.py
from datetime import datetime
from typing import Optional

class User:
    def __init__(
        self,
        id: Optional[int] = None,
        google_id: str = '',
        email: str = '',
        name: Optional[str] = 'NotName',
        picture: Optional[str] = 'https://placehold.co/400x400?text=?',
        token: str = '',
        refresh_token: str = '',
        token_uri: str = '',
        client_id: str = '',
        client_secret: str = '',
        scopes: str = '',
        expiry: str = ''
    ):
        self.id = id
        self.google_id = google_id.strip()
        self.email = email.strip()
        self.name = name.strip()
        self.picture = picture.strip() or "https://placehold.co/400x400?text=?"
        self.token = token
        self.refresh_token = refresh_token
        self.token_uri = token_uri
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.expiry = expiry

    def __repr__(self):
        return f"<User email={self.email}>"


class Channel:
    def __init__(
        self,
        id: Optional[int] = None,
        user_id: int = 0,
        channel_id: str = '',
        title: Optional[str] = 'Unknown',
        subscribers: int = 0,
        created_at: Optional[str] = None,
        is_active: int = 1
    ):
        self.id = id
        self.user_id = user_id
        self.channel_id = channel_id.strip()
        self.title = title
        self.subscribers = subscribers
        self.created_at = created_at or datetime.now().isoformat()
        self.is_active = is_active

    def __repr__(self):
        return f"<Channel id={self.channel_id} title='{self.title}'>"


class Video:
    def __init__(
        self,
        id: Optional[int] = None,
        channel_id: str = '',
        video_id: str = '',
        title: Optional[str] = 'Unknown',
        description: Optional[str] = '',
        published_at: Optional[str] = None,
        comment_count: int = 0
    ):
        self.id = id
        self.channel_id = channel_id.strip()
        self.video_id = video_id.strip()
        self.title = title
        self.description = description
        self.published_at = published_at or datetime.now().isoformat()
        self.comment_count = comment_count

    def __repr__(self):
        return f"<Video id={self.video_id} title='{self.title}'>"


class Comment:
    def __init__(
        self,
        id: Optional[int] = None,
        video_id: str = '',
        comment_id: str = '',
        author: Optional[str] = '',
        comment: str = '',
        published_at: Optional[str] = None
    ):
        self.id = id
        self.video_id = video_id.strip()
        self.comment_id = comment_id.strip()
        self.author = author.strip() if author else None
        self.comment = comment.strip()
        self.published_at = published_at or datetime.now().isoformat()

    def __repr__(self):
        return f"<Comment id={self.comment_id} author={self.author}>"


Writing model.py


In [8]:
%%writefile app_komentar_manager.py
import os
import csv
import datetime
import re
import uuid
import unicodedata

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow

import database
import config
from model import User, Channel, Video, Comment

# Constants
PREFIX = "[App_Komentar_Manager]"
SCOPES = config.SCOPES
CREDENTIALS_PATH = config.CREDENTIALS_PATH
LOGIN_SESSION = config.LOGIN_SESSION
TIME_SESSION = config.TIME_SESSION
DB_COMMENT_DELETEAFTERFINISHEXECUTE = config.DB_COMMENT_DELETEAFTERFINISHEXECUTE

fetch_query = database.fetch_query
execute_query = database.execute_query


import uuid
from model import Comment
from database import execute_query

PREFIX = "[App_Youtube_Commenter]"

def load_user_and_youtube():
    with open(LOGIN_SESSION, newline='', encoding='utf-8') as file:
        reader = csv.reader(file)
        rows = list(reader)
    email = rows[1][0]

    user_data = fetch_query("SELECT * FROM user WHERE email = ?", (email,), fetch_all=False)
    if not user_data:
        raise ValueError("User tidak ditemukan.")

    user = User(**user_data)
    creds = Credentials(
        token=user.token,
        refresh_token=user.refresh_token,
        token_uri=user.token_uri,
        client_id=user.client_id,
        client_secret=user.client_secret,
        scopes=user.scopes.split()
    )
    youtube = build("youtube", "v3", credentials=creds)
    return user, youtube

def get_youtube_channels():
    user, youtube = load_user_and_youtube()
    response = youtube.channels().list(part="id,snippet,statistics", mine=True).execute()
    channels = []
    if not response.get("items"):
        print(f"{PREFIX} Tidak ada channel ditemukan.")
        return user, []
    for item in response.get("items", []):
        # channels.append({
        #     "id": item["id"],
        #     "title": item["snippet"]["title"],
        #     "subscribers": int(item["statistics"].get("subscriberCount", "0"))
        # })
        ch = Channel(
            id=None,
            user_id=user.id,
            channel_id=item["id"],
            title=item["snippet"]["title"],
            subscribers=int(item["statistics"].get("subscriberCount", "0")),
            created_at=item["snippet"]["publishedAt"],
            is_active=0
        )
        channels.append(ch)
        execute_query("INSERT OR IGNORE INTO channel (user_id, channel_id, title, subscribers, created_at, is_active) VALUES (?, ?, ?, ?, ?, ?)", (
            ch.user_id,
            ch.channel_id,
            ch.title,
            ch.subscribers,
            ch.created_at,
            ch.is_active
        ))
    return user, channels

def activate_channel(user, selected_channel_id):
    execute_query("UPDATE channel SET is_active = 0 WHERE user_id = ?", (user.id,))
    execute_query("UPDATE channel SET is_active = 1 WHERE user_id = ? AND channel_id = ?", (user.id, selected_channel_id))
    with open(LOGIN_SESSION, mode="w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        writer.writerow(["email", "created_at", "channel_id"])
        writer.writerow([user.email, datetime.datetime.now().isoformat(), selected_channel_id])


def is_weird_text(text: str) -> bool:
    normalized_text = unicodedata.normalize("NFKD", text)
    if text != normalized_text:
        return True
    non_ascii_count = 0
    total_count = 0
    for char in text:
        if not char.isprintable():
            continue
        category = unicodedata.category(char)
        name = unicodedata.name(char, '')
        if category in ('Ll', 'Lu', 'Nd', 'Zs', 'Po'):
            continue
        if any(keyword in name for keyword in [
            "EMOJI", "HEART", "SMILING", "FACE", "HAND",
            "MUSIC", "NOTE", "SYMBOL", "STAR", "FIRE", "HUNDRED POINTS",
            "EMOTICON", "PARTY", "ANIMAL", "FOOD", "EYES", "DRAGON",
            "SQUARED", "CIRCLED", "ENCLOSED", "NEGATIVE"]):
            continue
        if ord(char) > 0x2E80:
            non_ascii_count += 1
        total_count += 1
    return total_count > 0 and (non_ascii_count / len(text)) > 0.3

def get_suspicious_comments(video_id: str):
    user, youtube = load_user_and_youtube()
    video_response = youtube.videos().list(part="snippet,statistics", id=video_id).execute()
    owner_channel_id = video_response.get("items", [{}])[0].get("snippet", {}).get("channelId", "")
    is_owner = False
    active_channel = fetch_query("SELECT * FROM channel WHERE user_id = ? AND is_active = 1", (user.id,), fetch_all=False)
    if not video_response.get("items"):
        return None, None, None
    if active_channel and active_channel["channel_id"] == owner_channel_id:
        is_owner = True
    suspicious = []
    next_page_token = None

    # Identitas video
    video_data = video_response["items"][0]
    snippet_video = video_data["snippet"]
    statistics = video_data["statistics"]

    channel_id = snippet_video.get("channelId", "")
    title = snippet_video.get("title", "")
    description = snippet_video.get("description", "")
    published_at = snippet_video.get("publishedAt", datetime.datetime.now().isoformat())
    comment_count = int(statistics.get("commentCount", 0))
    vd = Video(
        id=None,
        channel_id=channel_id,
        video_id=video_id,
        title=title,
        description=description,
        published_at=published_at,
        comment_count=comment_count
    )
    execute_query("INSERT OR IGNORE INTO video (channel_id, video_id, title, description, published_at, comment_count) VALUES (?, ?, ?, ?, ?, ?)", (
        vd.channel_id,
        vd.video_id,
        vd.title,
        vd.description,
        vd.published_at,
        vd.comment_count
    ))
    # Identitas komentar
    while True:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=100,
            textFormat="plainText",
            pageToken=next_page_token
        )
        response = request.execute()
        for item in response.get("items", []):
            snippet = item["snippet"]["topLevelComment"]["snippet"]
            comment_id = item["id"]
            author = snippet.get("authorDisplayName", "Unknown")
            author_channel_id = snippet.get("authorChannelId", {}).get("value", "")
            text = snippet.get("textDisplay", "")
            published_at = snippet.get("publishedAt", "")
            if author_channel_id == owner_channel_id:
                continue
            if is_weird_text(text):
                suspicious.append({
                    "unique_code": str(uuid.uuid4())[:8],
                    "comment_id": comment_id,
                    "author": author,
                    "text": text,
                    "published_at": published_at
                })
                cmt = Comment(
                    id=None,
                    video_id=video_id,
                    comment_id=comment_id,
                    author=author,
                    comment = text,
                    published_at=published_at
                )
                execute_query("INSERT OR IGNORE INTO comment (video_id, comment_id, author, comment, published_at) VALUES (?, ?, ?, ?, ?)", (
                    cmt.video_id,
                    cmt.comment_id,
                    cmt.author,
                    cmt.comment,
                    cmt.published_at
                ))
        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break
    return suspicious, youtube, is_owner

def handle_selected_comments(youtube, selected_comments, is_owner):
    results = []
    for selected in selected_comments:
        try:
            youtube.comments().markAsSpam(id=selected["comment_id"]).execute()
            if is_owner:
                youtube.comments().setModerationStatus(
                    id=selected["comment_id"], moderationStatus="rejected").execute()
                youtube.comments().delete(id=selected["comment_id"]).execute()
            results.append((selected["comment_id"], True))
            if DB_COMMENT_DELETEAFTERFINISHEXECUTE:
                execute_query("DELETE FROM comment")
        except Exception as e:
            results.append((selected["comment_id"], False))
    return results


def extract_video_id(url: str):
    # Accept both full and short YouTube URLs
    patterns = [
        r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})",
        r"(?:https?://)?youtu\.be/([a-zA-Z0-9_-]{11})"
    ]
    for pattern in patterns:
        match = re.match(pattern, url)
        if match:
            return match.group(1)
    return None


def get_user_info(credentials):
    try:
        oauth2 = build('oauth2', 'v2', credentials=credentials)
        return oauth2.userinfo().get().execute()
    except Exception as e:
        print(f"{PREFIX} Error get_user_info: {e}")
        return None


def login_session_is_valid():
    if not os.path.exists(LOGIN_SESSION):
        return False
    try:
        with open(LOGIN_SESSION, mode="r") as file:
            reader = csv.DictReader(file)
            row = next(reader)
            created_at = datetime.datetime.fromisoformat(row["created_at"])
            limit = created_at + datetime.timedelta(minutes=TIME_SESSION)
        return datetime.datetime.now() <= limit
    except Exception as e:
        print(f"{PREFIX} Error reading login session: {e}")
        return False


def login_user():
    if not os.path.exists(CREDENTIALS_PATH):
        print(f"{PREFIX} Credentials not found! {CREDENTIALS_PATH}")
        return

    if login_session_is_valid():
        print(f"{PREFIX} User already logged in!")
        return
    if os.path.exists(LOGIN_SESSION):
        os.remove(LOGIN_SESSION)
    flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_PATH, SCOPES)
    credentials = flow.run_local_server(port=0)
    user_info = get_user_info(credentials)

    if not user_info:
        print(f"{PREFIX} Failed Login. Cannot save user info!")
        return

    user = User(
        id=None,
        google_id=user_info["id"],
        email=user_info["email"],
        name=user_info.get("name", ""),
        picture=user_info.get("picture", ""),
        token=credentials.token,
        refresh_token=credentials.refresh_token,
        token_uri=credentials.token_uri,
        client_id=credentials.client_id,
        client_secret=credentials.client_secret,
        scopes=" ".join(credentials.scopes),
        expiry=credentials.expiry.isoformat()
    )

    existing = fetch_query("SELECT id FROM user WHERE google_id = ?", (user.google_id,), fetch_all=False)

    if existing:
        user.id = existing["id"]
        query = """
        UPDATE user SET email=?, name=?, picture=?, token=?, refresh_token=?,
        token_uri=?, client_id=?, client_secret=?, scopes=?, expiry=?
        WHERE google_id=?;
        """
        params = (
            user.email, user.name, user.picture, user.token,
            user.refresh_token, user.token_uri, user.client_id,
            user.client_secret, user.scopes, user.expiry, user.google_id
        )
    else:
        query = """
        INSERT INTO user (google_id, email, name, picture, token, refresh_token,
        token_uri, client_id, client_secret, scopes, expiry)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
        """
        params = (
            user.google_id, user.email, user.name, user.picture,
            user.token, user.refresh_token, user.token_uri,
            user.client_id, user.client_secret, user.scopes, user.expiry
        )

    success = execute_query(query, params)

    if success:
        with open(LOGIN_SESSION, mode="w", newline="") as file:
            writer = csv.writer(file)
            writer.writerow(["email", "created_at"])
            writer.writerow([user.email, datetime.datetime.now().isoformat()])
        print(f"{PREFIX} ✅ User {user.email} berhasil login dan disimpan.")
    else:
        print(f"{PREFIX} ❌ Gagal menyimpan user ke database.")

#
# Choose Channel
#

# def get_youtube_channels():
#     with open(LOGIN_SESSION, newline='', encoding='utf-8') as file:
#         reader = csv.reader(file)
#         rows = list(reader)
#     email = rows[1][0]

#     user_data = fetch_query("SELECT * FROM user WHERE email = ?", (email,), fetch_all=False)
#     if not user_data:
#         print(f"{PREFIX} User tidak ditemukan.")
#         return None, None

#     user = User(**user_data)

#     creds = Credentials(
#         token=user.token,
#         refresh_token=user.refresh_token,
#         token_uri=user.token_uri,
#         client_id=user.client_id,
#         client_secret=user.client_secret,
#         scopes=user.scopes.split()
#     )

#     youtube = build("youtube", "v3", credentials=creds)
#     request = youtube.channels().list(part="id,snippet,statistics", mine=True)
#     response = request.execute()

#     channels = []
#     for item in response.get("items", []):
#         ch = Channel(
#             id=None,
#             user_id=user.id,
#             channel_id=item["id"],
#             title=item["snippet"]["title"],
#             subscribers=int(item["statistics"].get("subscriberCount", "0")),
#             created_at=item["snippet"]["publishedAt"],
#             is_active=0
#         )
#         channels.append(ch)

#     return channels, user

def clear_database():
    input2 = input("Are you sure you want to clear all data ? (type 'yes' if you agree)")
    if input2 == 'yes':
        execute_query("DELETE FROM comment")
        execute_query("DELETE FROM channel")
        execute_query("DELETE FROM user")
        execute_query("DELETE FROM video")
    else:
        print(f"Oke, gk dihapus ya xD")

if __name__ == "__main__":
    clear_database()
    # database.setup_database_initial()
    # if DB_COMMENT_DELETEAFTERFINISHEXECUTE:
    #     execute_query("DELETE FROM comment")  # ← Komentar aja kalau mau nyimpen histori~
    # login_user()

Writing app_komentar_manager.py


In [9]:
%%writefile app.py
import streamlit as st
import uuid
from app_komentar_manager import login_user, login_session_is_valid, get_youtube_channels, activate_channel, get_suspicious_comments, handle_selected_comments, extract_video_id

google_logo_url = "https://upload.wikimedia.org/wikipedia/commons/c/c1/Google_%22G%22_logo.svg"
refresh = """
            <script>
                setTimeout(function(){
                    window.location.reload();
                }, 3000);  // Delay 1 detik biar user lihat dulu pesan suksesnya
            </script>
        """
# https://www.youtube.com/watch?v=7EAh20TSxzA&t=1s
def main():
    if 'chosen_channel' not in st.session_state:
        st.session_state.chosen_channel = None
    if 'youtube' not in st.session_state:
        st.session_state.youtube = None
    if 'is_owner' not in st.session_state:
        st.session_state.is_owner = False
    if 'comments' not in st.session_state:
        st.session_state.comments = []

    st.title("YouTube Komentar Manager 🎥")

    # Langkah 1: Pilih Channel
    st.header("1. Pilih Channel Aktif")
    user, channels = get_youtube_channels()
    if not channels:
        st.error("Tidak ada channel ditemukan.")
        st.stop()

    channel_titles = [f"{c.title} ({c.subscribers} subs)" for c in channels]
    selected_title = st.selectbox("Pilih salah satu channel:", channel_titles)

    selected_channel = channels[channel_titles.index(selected_title)]
    if st.button("Aktifkan Channel Ini"):
        activate_channel(user, selected_channel.channel_id)
        st.session_state.chosen_channel = selected_channel
        st.success(f"Channel '{selected_channel.title}' berhasil diaktifkan!")

    # Langkah 2: Masukkan link video
    st.header("2. Masukkan Link Video YouTube")
    video_link = st.text_input("Tempelkan link video YouTube kamu di sini:")
    if st.button("Ambil Komentar Mencurigakan"):
        video_id = extract_video_id(video_link)
        if not video_id:
            st.error("Link video tidak valid.")
        else:
            comments, youtube, is_owner = get_suspicious_comments(video_id)
            if not comments:
                st.info("Tidak ada komentar mencurigakan ditemukan.")
            else:
                st.session_state.youtube = youtube
                st.session_state.comments = comments
                st.session_state.is_owner = is_owner
                st.success(f"Ditemukan {len(comments)} komentar mencurigakan.")

    # Langkah 3: Tampilkan komentar dan beri opsi centang
    if st.session_state.comments:
        st.header("3. Komentar Mencurigakan")
        selected_comments = []
        for comment in st.session_state.comments:
            checked = st.checkbox(f"[{comment['unique_code']}] {comment['author']}: {comment['text']}", key=comment['unique_code'], value=True)
            if checked:
                selected_comments.append(comment)

        if st.button("Report Komentar sebagai Spam dan Delete Komentar"):
            handle_selected_comments(st.session_state.youtube, selected_comments, st.session_state.is_owner)
            st.success("Komentar yang dipilih sudah diproses.")

def login():
    if not login_session_is_valid():
        st.title("Login Page!")

        login_clicked = st.button("🔒 Login with Google", key="google_login_button")
        if login_clicked:
            login_user()
            st.success("Login berhasil!")
            st.markdown(refresh, unsafe_allow_html=True)
            main()
    # State untuk menyimpan channel yang dipilih
    else:
          main()


if __name__ == "__main__":
    login()

Writing app.py


In [17]:
!pip install pyngrok streamlit

Collecting streamlit
  Downloading streamlit-1.46.0-py3-none-any.whl.metadata (9.0 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.46.0-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m65.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m46.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hI

In [18]:
from pyngrok import ngrok
import time
import subprocess
from config import NGROK_TOKEN_PATH
from google.colab import drive
drive.mount('/content/drive')
file_path = NGROK_TOKEN_PATH

with open(file_path, 'r') as f:
    lines = f.readlines()
    for line in lines:
        if "Token:" in line:
            token = line.split("Token:")[1].strip()
def web(apps):
  !ngrok config add-authtoken {token}
  time.sleep(5)
  ngrok.kill()
  process = subprocess.Popen(['streamlit', 'run', f'/content/{apps}', '--server.headless', 'true', '--server.port', '8501', '--server.enableCORS', 'false'])

  # Ngrok tunnel
  public_url = ngrok.connect("http://localhost:8501")
  print("Streamlit ready at:", public_url)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [19]:
web("app.py")

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml
Streamlit ready at: NgrokTunnel: "https://b61a-34-16-151-103.ngrok-free.app" -> "http://localhost:8501"
