In [1]:
import pprint
from apiclient.discovery import build
from urllib.parse import urlparse, parse_qs
import pandas as pd
import numpy as np
from confluent_kafka import Producer
import logging
import socket
import json
import time
import os
import tensorflow as tf
import pickle
from tensorflow.keras.preprocessing import sequence
from preprocessing import preprocessing

In [2]:
# Build service for calling the Youtube API:
## Arguments that need to passed to the build function
DEVELOPER_KEY = "AIzaSyDbt-xdAOjDhJghQGVMxfbsSiSyCFJr1Jw" 
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
# video_link = "https://www.youtube.com/watch?v=-1X6Ak94Acs"
video_link = "https://youtu.be/Wb_-uXCyeYI"
   
## creating Youtube Resource Object
youtube_service = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
                                        developerKey = DEVELOPER_KEY)

In [3]:
# Create a producer
def create_producer():
    try:
        producer = Producer({"bootstrap.servers": "localhost:9092",
                             "client.id": socket.gethostname(),
                             "enable.idempotence": True,  # EOS processing
                             "compression.type": "lz4",
                             "batch.size": 64000,
                             "linger.ms": 10,
                             "acks": "all",  # Wait for the leader and all ISR to send response back
                             "retries": 5,
                             "delivery.timeout.ms": 1000})  # Total time to make retries
    except Exception as e:
        logging.exception("Couldn't create the producer")
        producer = None
    return producer

### Function to get youtube video id.
# source:
# https://stackoverflow.com/questions/45579306/get-youtube-video-url-or-youtube-video-id-from-a-string-using-regex
def get_id(url):
    u_pars = urlparse(url)
    quer_v = parse_qs(u_pars.query).get('v')
    if quer_v:
        return quer_v[0]
    pth = u_pars.path.split('/')
    if pth:
        return pth[-1]
    
def get_comments(url,num_comment):
  response = youtube_service.commentThreads().list(
      part='snippet',
      maxResults=num_comment,
      textFormat='plainText',
      order='time',
      videoId=get_id(url)
  ).execute()

  results = response.get('items',[])

  # extract video comments
  authors=[]
  authorUrls=[]
  texts=[]
  datetimes=[]

  for item in results:
    authors.append(item['snippet']['topLevelComment']['snippet']['authorDisplayName'])
    authorUrls.append(item['snippet']['topLevelComment']['snippet']['authorChannelUrl'])
    texts.append(item['snippet']['topLevelComment']['snippet']['textDisplay'])
    datetimes.append(item['snippet']['topLevelComment']['snippet']['updatedAt'])

  dataFrame = pd.DataFrame({'datetime':datetimes,'author':authors,'authorUrl':authorUrls,'comment':texts})

  return dataFrame

In [4]:
producer = create_producer()
response = youtube_service.commentThreads().list(
      part='snippet',
      maxResults=100,
      textFormat='plainText',
      order='time',
      videoId=get_id(video_link)
  ).execute()

# response = youtube_service.liveChatMessages().list(
#       part='snippet',
#       maxResults=100,
#       liveChatId=get_id(video_link)
#   ).execute()

results = response.get('items',[])

# extract video comments
try:
    for item in results:
        author = item['snippet']['topLevelComment']['snippet']['authorDisplayName']
        authorurl = item['snippet']['topLevelComment']['snippet']['authorChannelUrl']
        comment = item['snippet']['topLevelComment']['snippet']['textDisplay']
        datetime = item['snippet']['topLevelComment']['snippet']['updatedAt']
        
        ## Hate speech detection
        # load DNN model
        model_path = os.path.abspath('../model/Text_CNN_model_PhoW2V.h5')
        model = tf.keras.models.load_model(model_path)
        tknz_path = os.path.abspath('../model/tokenizer.pickle')
        with open(tknz_path,"rb") as f:
            tokenizer = pickle.load(f)
            
        # dnn
        processed_comment = preprocessing(comment)
        seq_comment = tokenizer.texts_to_sequences([processed_comment])
        ds_comment = sequence.pad_sequences(seq_comment,maxlen=80)
        pred = model.predict(ds_comment)
        hsd_dt = pred.argmax(-1)
        
        record = {"author":author,"datetime":datetime,"raw_comment":comment,
                  "clean_comment":processed_comment,"label":int(hsd_dt[0])}
        record = json.dumps(record).encode("utf-8")
        print('produce message')
        print(record)

        producer.produce(topic="detected",value=record)
        producer.flush(30)
        time.sleep(5)
except KeyboardInterrupt:
        print('Stop flush!')
        pass



produce message
b'{"author": "Tuy\\u1ec1n V\\u0103n H\\u00f3a", "datetime": "2021-10-07T19:35:06Z", "raw_comment": "Highlights Trung Qu\\u1ed1c vs Vi\\u1ec7t Nam: https://youtu.be/HRGLauElf6g", "clean_comment": "highlights trung_qu\\u1ed1c v\\u1edbi vi\\u1ec7t_nam", "label": 0}'
produce message
b'{"author": "Ngoc bao Ngo", "datetime": "2021-10-21T06:00:41Z", "raw_comment": "tuy t\\u00f4i kh\\u00f4ng h\\u1ee9 th\\u00fa b\\u00f3ng \\u0111\\u00e1 nh\\u01b0 \\nC\\u00e1c b\\u1ea1n h\\u00e3y c\\u1ed1 l\\u00ean thua c\\u0169ng \\u0111\\u01b0\\u1ee3c kh\\u00f4ng sao c\\u1ea3 \\nmi\\u1ec5n l\\u00e0 kh\\u00f4ng b\\u1ecb tr\\u1ea5n th\\u01b0\\u01a1ng l\\u00e0 \\u0111\\u01b0\\u1ee3c \\nkh\\u00f4ng c\\u1ea7n ph\\u1ea3i c\\u1ed1 qu\\u00e1 l\\u00e0m j l\\u1ea1i \\u1ea3nh h\\u01b0\\u1edfng \\u0111\\u1ebfn s\\u1ee9c kho\\u1ebb th\\u00ec kh\\u1ed5 l\\u1eafm", "clean_comment": "tuy t\\u00f4i kh\\u00f4ng h\\u1ee9 th\\u00fa b\\u00f3ng_\\u0111\\u00e1 nh\\u01b0 c\\u00e1c b\\u1ea1n h\\u00e3y c\\u1ed1 l\\u00ea