# YT live chat automod
Credentials generation :


In [2]:
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
import pandas as pd
import asyncio
import numpy as np
import json
import requests


## Auth generation
used to generate crendentials file with the client credentials

In [5]:
from google_auth_oauthlib.flow import InstalledAppFlow

# Remplacez par vos propres informations
scopes = ['https://www.googleapis.com/auth/youtube']
client_secrets_file = 'client_secret.json'

def main():
    flow = InstalledAppFlow.from_client_secrets_file(
        client_secrets_file,
        scopes=scopes,
        redirect_uri='urn:ietf:wg:oauth:2.0:oob')

    auth_url, _ = flow.authorization_url(prompt='consent')

    print('Please go to this URL and authorize access:', auth_url)
    code = input('Enter the authorization code: ')
    flow.fetch_token(code=code)

    credentials = flow.credentials

    # Creation of a crendentials file
    with open("credentials.json", "w") as f:
        f.write(credentials.to_json())
    
    print(credentials.to_json())

if __name__ == '__main__':
    main()


### Class request
Class to handle youtube request since youtube api doesn't provide a request object

In [6]:
class Request:
    """ Class Request handling youtube request as an object """
    def __init__(self, requestType,part=None, id=None, chart=None, regionCode=None, maxResults=None, pageToken=None, videoId=None, liveChatId=None):
        self.requestType = requestType
        self.part = part
        self.id = id
        self.chart = chart
        self.regionCode = regionCode
        self.maxResults = maxResults
        self.pageToken = pageToken
        self.videoId = videoId
        self.liveChatId = liveChatId
        
    def execute(self):
        param = vars(self) # Fetch class attributes
        param = {x:y for x,y in list(param.items())[1:] if y} # Delete requestType ([1:]) and None attributes
        
        request = self.requestType.list(**param)
        return request.execute()

## Logger 
Setup logger

In [7]:
import logging
logFolder = 'logs'

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(f'{logFolder}/automod.log')
file_handler.setLevel(logging.DEBUG)

formatter = logging.Formatter('%(levelname)s : %(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.propagate = False

### Live chat ID from video ID
Used to get the chat ID from Live ID

In [8]:
def get_liveChat(youtube, video_id) -> dict:
    try:
        video_response = youtube.videos().list(
            part="liveStreamingDetails",
            id=video_id
        ).execute()
        
        video:dict = video_response.get('items', [])[0]
        videoData = {
            "id" : video.get('id', ""),
            "startTime" : video.get("liveStreamingDetails", {}).get("actualStartTime", ""),
            "concurrentViewers" : int(video.get("liveStreamingDetails", {}).get("concurrentViewers", 0)),
            "chatID" : video.get("liveStreamingDetails", {}).get("activeLiveChatId", ""),
        }
        
        if videoData['chatID'] == "":
            raise Exception("This live has not chat available")
        return videoData
    except Exception as e:
        print(e)        


### Get Channel info

In [9]:
def format_channel_data(channel_data: dict):
    """ Structure raw channel data """
    data = {
        "channel_name": channel_data.get('snippet', {}).get('title'),
        "channel_id": channel_data.get('id'),
        "country": channel_data.get('snippet', {}).get('country',""),
        **{k:int(v) for k,v in channel_data.get('statistics', {}).items() if k != "hiddenSubscriberCount"},
        "topics": [wikilink.split('/')[-1] for wikilink in channel_data.get('topicDetails', {}).get('topicCategories', [])],
    }
    return data

In [10]:
def get_channel_data(youtube, channel_id:str) -> dict[str|dict]:
    """ Request (by id) for most important channel stats """
    request = Request(
        requestType=youtube.channels(),
        part="snippet,contentDetails,statistics,topicDetails",
        id=channel_id
    )
    response = request.execute()
    rawData = response.get('items', [])[0]
    return format_channel_data(rawData)


# Gorgias functions

In [11]:
with open("gorgias_auth.json") as f:
    gorgias_auth = json.load(f)

BASE_GORGIAS_URL = gorgias_auth["url"]
# Exemple of a function using the prolog API to prove a policy option
def queryGorgias(facts=[], gorgiasFile="", query="", auth=(gorgias_auth["user"], gorgias_auth["pass"])):
    # query = "challenge(Agent, Resource)"  # prolog query
    data = {
        "facts": facts,  # Facts as list of str
        "gorgiasFiles": [
            gorgiasFile  #  Gorgias file name preceded by project name: project/file.pl
        ],
        "query": query,
        "resultSize": 1
    }

    r = requests.post(f"{BASE_GORGIAS_URL}/GorgiasQuery", json=data, auth=auth)

    if r.status_code != 200:
        print("error")
        return False
    return r.json()

# Exemple of a function using the prolog API to create a project
def createProject(project_name = "", auth=("elnidala", "GorgiasPass!")):
    r = requests.post(f"{BASE_GORGIAS_URL}/createProject?project_name={project_name}", auth=auth)
    if r.status_code != 200:
        print("error")
        return False
    return r.json()

# Exemple of a function using the prolog API to add a file
def addFile(file = "", project= "", type= "", auth=("elnidala", "GorgiasPass!")):
    files = {'file': open(f'{file}', 'rb')}

    r = requests.post(f"{BASE_GORGIAS_URL}/addFile?project={project}&type={type}", files=files, auth=auth)
    if r.status_code != 200:
        print(r.status_code)
        return False
    return r.json() 

# Exemple of a function using the prolog API to delete a project
def deleteProject(project = "", auth=("elnidala", "GorgiasPass!")):
    r = requests.post(f"{BASE_GORGIAS_URL}/deleteProject?project={project}", auth=auth)
    if r.status_code != 200:
        print("error")
        return False
    return r.json()


# Exemple of a function using the prolog API to delete a file
def deleteFile(filename = "", project="", auth=("elnidala", "GorgiasPass!")):
    r = requests.post(f"{BASE_GORGIAS_URL}/deleteFile?filename={filename}.pl&project={project}", auth=auth)
    if r.status_code != 200:
        print("error")
        return False
    return r.json() 


def initializeGorgias(projectName, policyFile):
    """ Creates the project and add the file """
    if not(createProject(projectName) == 'OK' and addFile(policyFile, projectName, "gorgias") == ['OK']):
        raise Exception('Failed to initialize')
    
#Returns the result of the query
def askGorgias(facts = [], projectName="", policyFile=""):
    response = queryGorgias(facts=facts, query=f"action(X)", gorgiasFile=f"{projectName}/{policyFile}")
    try :
        return response.get("result", [])[0].get('variables', {}).get("X", "") 
    except Exception:
        print(response)
        # raise Exception("Invalid Facts")
    
def terminateGorgias(projectName):
    """ Delete the project """
    if not(deleteProject(projectName) == 'OK'):
        raise Exception('Failed to delete')
   
   
# initializeGorgias(projectName, policyFile)
# askGorgias(["", "negative_message"], projectName, policyFile)
# terminateGorgias(projectName, policyFile)

### Chat main function
timeout, ban and unban functions

In [12]:
def timeout_user(youtube, liveChatId, user_id, duration=360):
    request = youtube.liveChatBans().insert(
        part="snippet",
        body={
          "snippet": {
            "liveChatId": liveChatId,
            "type": "temporary",
            "banDurationSeconds": duration,
            "bannedUserDetails": {
              "channelId": user_id
            }
          }
        }
    )
    response = request.execute()
    ban_id = response.get('id', "")
    print(response)
    
def perma_ban_user(youtube, liveChatId, user_id):
  request = youtube.liveChatBans().insert(
        part="snippet",
        body={
          "snippet": {
            "liveChatId": liveChatId,
            "type": "permanent",
            "bannedUserDetails": {
              "channelId": user_id
            }
          }
        }
    )
  response = request.execute()
  ban_id = response.get('id',  "")
  print(response)
  
def unban_user(youtube, ban_id):
  request = youtube.liveChatBans().delete(
        id=ban_id
    )
  request.execute()

In [13]:
def convert_messageList(messages: list[dict]) -> pd.DataFrame:
    return "".join([f"{el['author']} : {el['message']}\n" for el in messages])

def get_message_info(items:list[dict]) -> pd.DataFrame:
    data = []
    for item in items:
        data.append({
        "username" : item.get('authorDetails', {}).get('displayName', ""),
        "message_id": item.get('id', ""),
        "message" : item.get('snippet', {}).get('displayMessage', ""),
        "channel_id" : item.get('snippet', {}).get('authorChannelId', ""),
        "publishedAt" : item.get('snippet', {}).get('publishedAt', "")
    })
    # messages = pd.DataFrame(data)
    # logger.debug(messages.columns)
    # messages["publishedAt"] = pd.to_datetime(messages["publishedAt"])
    # return messages.sort_values(by='publishedAt')
    return pd.DataFrame(data)

## Chat Logs

In [14]:
def get_chat_logs(live_chat_id):
    """ Contains every known user """
    try:
        df = pd.read_csv(f"{logFolder}/{live_chat_id}.csv")
        return df
    except FileNotFoundError:
        columns = ['username', "last_msg_id", 'last_msg', "channel_id", 'nb_msg', 'nb_ban', 'total_timeout_time']
        return pd.DataFrame(columns=columns) # Empty Dataframe
      
def update_known_users(row: pd.Series, messages:pd.DataFrame):
    msg = messages.copy()
    try:
        # Drop de tous les messages précédent le dernier message vu
        if row['last_msg_id'] in msg['message_id'].to_list():
            # print("last message here")
            drop_index:pd.Index = msg.loc[msg['message_id'] == row['last_msg_id']].index
            msg = msg.drop(msg[(msg['channel_id'] == row['channel_id']) & (msg.index <= drop_index[0])].index)
        
        # Si l'id est présentes dans les derniers messages 
        if row['channel_id'] in msg['channel_id'].to_list():
            row["last_msg"] = msg[msg['channel_id'] == row["channel_id"]].groupby('channel_id').last()["message"].iloc[-1]
            # print(f"{row['username']} - Added {len(msg[msg['channel_id'] == row['channel_id']])} messages to {row['nb_msg']}")
            row["nb_msg"] += len(msg[msg['channel_id'] == row['channel_id']])
            row['last_msg_id'] = msg.loc[msg['channel_id'] == row['channel_id'], 'message_id'].iloc[-1]
        return row
    except Exception:
        print("\033[92mCrashed \033[0m")
        # print(messages.columns + "\033[92m <- Crashed \033[0m")

def update_chat_logs(messages: pd.DataFrame, chat_logs: pd.DataFrame) -> pd.DataFrame:
    # +1 au nombre de message pour les utilisateurs déjà présents
    # chat_logs.loc[(chat_logs['channel_id'].isin(messages['channel_id']) & ~chat_logs['last_msg_id'].isin(messages['message_id'])), "nb_msg"] += 1
    # print(f"Number of known users : {len(chat_logs[chat_logs['channel_id'].isin(messages['channel_id'])])}")
    
    chat_logs = chat_logs.apply(lambda row: update_known_users(row, messages), axis=1)
    
    # ajout de la ligne pour utilisateurs non présents
    unknown_users = messages[~messages['channel_id'].isin(chat_logs['channel_id'])].copy()
    unknown_users.drop(["publishedAt"], axis=1)
    unknown_users.rename(columns={'message_id': 'last_msg_id', "message" : 'last_msg'}, inplace=True)
    unknown_users = unknown_users.reindex(columns=chat_logs.columns)
    unknown_users['nb_msg'] = 1 # Initialisation du nombre de message à 1
    
    # Cas particulier où plusieurs messages du même utilisateur inconnu arrivent en même temps 
    unknown_users = unknown_users.groupby('channel_id').agg({'last_msg_id': 'last', 'nb_msg': 'sum', 'username': 'last', 'last_msg':'last'}).reset_index()
    
    unknown_users['nb_ban'] = 0
    unknown_users['total_timeout_time'] = 0
    
    return pd.concat([chat_logs, unknown_users])
 


## Sentiment Analysis


In [15]:

# text_example = "Horrible Gameplay"
# result = process_message(
#     text_example,
#     pipelines={
#         'insult': insult_detector,
#         'sentiment': sentiment_detector,
#         'emotion': emotion_detector
#     },
#     config=config
# )

# print("Processing Result:", result)

## Handleling Messages
Sentiment analysis

In [None]:
from sentiment import insult_detector, sentiment_detector, emotion_detector, config, process_message

In [17]:
timeout_lvl = { # To change by user
    "sanction_level_one" : 600,
    "sanction_level_two": 3600,
    "sanction_level_three": 86400
}


def analyse_message(youtube, row: pd.Series, live_id, chat_logs:pd.DataFrame):
    projectName = "TER2024"
    policyFile = "decision.pl"
    
    logs = chat_logs.copy(deep=True)
    nb_bans = logs.set_index('channel_id').loc[row['channel_id'], 'nb_ban'] if row['channel_id'] in logs['channel_id'].values else 0
    
    try:
        facts = process_message(
            row["message"],
            pipelines = {
                'insult': insult_detector,
                'sentiment': sentiment_detector,
                'emotion': emotion_detector
            },
            config = config,
            timeout = nb_bans
        )
        
        punish_id = askGorgias(facts, projectName, policyFile)
        logger.debug(f"{row['message']} + {facts} -> {punish_id}")
        print(f"\"{row['message']}\" + {facts} -> {punish_id}")
        
        punishements = {"no_nothing" : None,
            "warning": None, #Insert message ?
            "sanction_level_one": lambda: timeout_user(youtube, live_id, row["channel_id"], timeout_lvl["sanction_level_one"]), 
            "sanction_level_two": lambda: timeout_user(youtube, live_id, row["channel_id"], timeout_lvl["sanction_level_two"]), 
            "sanction_level_three": lambda: timeout_user(youtube, live_id, row["channel_id"], timeout_lvl["sanction_level_three"]), 
            "ban_user" : lambda: perma_ban_user(youtube, live_id, row["channel_id"])
            }
        
        if punishements[punish_id]: 
            punishements[punish_id]() # Calls the function
            chat_logs.loc[chat_logs['channel_id'] == row['channel_id'], 'nb_ban'] += 1
        
    except Exception as e:
        # print(row.to_dict())
        print(e)
        print()
        pass
    finally:
        return row    


In [18]:
async def handle_messages(youtube, messages: pd.DataFrame, live_id, chat_logs: pd.DataFrame) -> pd.DataFrame:
    if not messages.empty:
        chat_logs = update_chat_logs(messages, chat_logs)
        # analyse_message(youtube, messages, live_id, chat_logs)
        
        logger.debug(f"Added {len(messages)} msg")
        print(f"\033[92mAdded {len(messages)} msg\033[0m")
        
        # messages.apply(lambda el: print(f"{el['username']} : {el['message']}"), axis=1)
        
        
        """ Analyse every single message and decide to ban it or not"""
        messages.apply(lambda row: analyse_message(youtube, row, live_id, chat_logs), axis=1)
    
    
    
    return chat_logs

## Fetching chat messages

In [20]:
async def fetch_live_chat_messages(youtube, live_chat_id):
    chat_logs = get_chat_logs(live_chat_id)
    
    request = Request(youtube.liveChatMessages(), liveChatId=live_chat_id, part='id,snippet,authorDetails', pageToken="")
    response = request.execute()
    
    messages = get_message_info(response.get('items', []))
    
    try:
        while True:
            chat_logs = await handle_messages(youtube, messages, live_chat_id, chat_logs)
            # await process_messages(youtube, messages, live_chat_id)
            
            # Intervalle de temps donné avant la prochaine request
            logger.debug(f'PollingRate : {response.get("pollingIntervalMillis", 5000)}')
            await asyncio.sleep(response.get("pollingIntervalMillis", 5000) / 1000.0)  
            
            request.pageToken = response.get("nextPageToken", "")
            response = request.execute()
            
            messages = get_message_info(response.get('items', []))
            
    except asyncio.CancelledError: #interruption du programme
        print("interrupt")
        chat_logs.to_csv(f"{logFolder}/{live_chat_id}.csv", index=False)   


In [None]:
# Configuration et authentification de l'API YouTube
projectName = "TER2024"
policyFile = "decision.pl"
credentials = Credentials.from_authorized_user_file('credentials.json')
youtube = build('youtube', 'v3', credentials=credentials, developerKey='AIzaSyCCdz9UNwtWNJGFL_Q9EghMOO0u5O5qYlg')

videoData = get_liveChat(youtube, 'oXGELAErKx0')

live_chat_id = videoData["chatID"]
print(f"Awaiting messages on chat ID: {live_chat_id}")

# initializeGorgias(projectName, policyFile)

# Démarrer la boucle de récupération des messages
# try:
await fetch_live_chat_messages(youtube, live_chat_id) 
# except Exception:
#     pass
# finally:
# print('Terminate')
# terminateGorgias(projectName)

## Sentiment analysis tests

In [17]:
test_phrases = [
    "This is amazing, I love how you're explaining everything!",
    "You're so dumb, how do you not get this?",
    "Can you please show that part again?",
    "Wow, this stream is boring.",
    "You're doing a great job, keep it up!",
    "I can't believe you're this stupid.",
    "This is the best stream ever!",
    "Why are you even trying, you suck at this.",
    "I'm so happy I found this stream.",
    "Your voice is really annoying.",
    "Great content, very informative!",
    "I hope you fail at everything you do.",
    "Thanks for answering my question, you're awesome!",
    "This is a waste of time.",
    "You're such a kind and thoughtful person.",
    "Nobody cares about what you're saying.",
    "I'm learning so much from you!",
    "You're the worst streamer ever.",
    "Can you give me a shoutout?",
    "Your setup looks really cool, love it!"
]

projectName = "TER2024"
policyFile = "decision.pl"
for p in test_phrases:
    facts = process_message(
            p,
            pipelines = {
                'insult': insult_detector,
                'sentiment': sentiment_detector,
                'emotion': emotion_detector
            },
            config = config,
            timeout = 0
        )
    
    punish_id = askGorgias(facts, projectName, policyFile)   
    print(f"\"{p}\" detected as {facts} -> {punish_id}")

"This is amazing, I love how you're explaining everything!" detected as ['not_toxic_message'] -> do_nothing
"You're so dumb, how do you not get this?" detected as ['very_toxic_message'] -> sanction_level_three
"Can you please show that part again?" detected as ['not_toxic_message'] -> do_nothing
"Wow, this stream is boring." detected as ['not_toxic_message'] -> do_nothing
"You're doing a great job, keep it up!" detected as ['not_toxic_message'] -> do_nothing
"I can't believe you're this stupid." detected as ['very_toxic_message'] -> sanction_level_three
"This is the best stream ever!" detected as ['not_toxic_message'] -> do_nothing
"Why are you even trying, you suck at this." detected as ['very_toxic_message'] -> sanction_level_three
"I'm so happy I found this stream." detected as ['not_toxic_message'] -> do_nothing
"Your voice is really annoying." detected as ['toxic_message', 'negative_message', 'negative_high_intensity_emotion'] -> sanction_level_two
"Great content, very informative