In [None]:
import re
import json
import html
import getpass
import warnings
from time import time

from googleapiclient.discovery import build
from langdetect import detect, DetectorFactory
import pandas as pd
from bs4 import BeautifulSoup

import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

In [None]:
def LoadKeyYotube() -> str:
    
    # Load the API key of youtube v3 found in .env
    
    with open("../.env","r") as iJSON:
        key = json.load(iJSON)["keys"]["key_youtube"]
    return key

def GetVideoComments(youtube:build, video_id:str) -> "list(dict)":

    '''
    This function retrieves the comment, date, user and likes of a youtube video
    
    Parameters
    ----------
    
    video_id: str
        id of youtube video
    
    Return
    ------
        list(dict): List of diccionaries containing the comment, date, user and likes of the comment
    
    Examples
    --------

    >>> from googleapiclient.discovery import build
    >>> my_video = "dGiQaabX3_o"
    >>> youtube = build('youtube', 'v3', developerKey=api_key) # replace by your api key
    >>> comments = get_video_comments(my_video) 

    '''
    
    comments = []
    next_page_token = None

    iteration = 1
    while True:
        
        # Show on screen current pagination
        if (iteration % 10) == 0:
            print(f"\tPages checked: {iteration}", end="\r")
        
        # Calling youtube to retrieve comments
        response = youtube.commentThreads().list(
            part='snippet',
            videoId=video_id,
            pageToken=next_page_token,
            maxResults=10000
        ).execute()

        # Formating into a dict the data requested
        for item in response['items']:
            comment = item['snippet']['topLevelComment']['snippet']['textDisplay']
            comment_date = item['snippet']['topLevelComment']['snippet']['publishedAt']
            comment_user = item['snippet']['topLevelComment']['snippet']['authorDisplayName']
            comment_likes = item['snippet']['topLevelComment']['snippet']['likeCount']

            comment_data = {
                'comment': comment,
                'published_date': comment_date,
                'name': comment_user,
                'likes': comment_likes
            }
            
            comments.append(comment_data)

        next_page_token = response.get('nextPageToken')

        # End while lopp if there is no more pages to retrieve comments
        if not next_page_token:
            break
            
        iteration += 1
    
    print("\nAll done")
    return comments

def GetLanguage(text:str) -> str:
    
    # This function detect the language of a text. If there is no enough evidence
    # to define the language returns "undefined"

    try:
        return detect(text)
    except:
        return "undefined"

# The following classes represent a Object–role modeling (ORM) of the expected tables
# found in the database with the corresponding constraints. It's useful to validate data
# before insert
    
class Languages(declarative_base()):
    __tablename__ = 'languages'

    # Here Integer represents SERIAL in id column
    id_language = Column(Integer, primary_key=True, nullable=False, unique=True)
    code = Column(String(9), nullable=False, unique=True)
    
class Titles(declarative_base()):
    __tablename__ = 'titles'

    id_video = Column(String(12), primary_key=True, nullable=False, unique=True)
    title = Column(String, nullable=False)
    
class Users(declarative_base()):
    __tablename__ = 'users'
    
    # Here Integer represents SERIAL in id column
    id_user = Column(Integer, primary_key=True, nullable=False, unique=True)
    name = Column(String, nullable=False)
    
class Comments(declarative_base()):
    __tablename__ = 'comments'

    # Here Integer represents SERIAL in id column
    id_comment = Column(Integer, primary_key=True, nullable=False, unique=True)
    comment = Column(Text, nullable=True)
    published_date = Column(DateTime, nullable=True)
    likes = Column(Integer, nullable=True)
    id_user = Column(Integer, nullable=False)
    id_video = Column(String, nullable=False)
    id_language = Column(Integer, nullable=False)

class DataValidator:

    '''
    This class validates data before inserting into a database
    '''

    def __init__(self, engine:sqlalchemy.create_engine,
                orm_model:sqlalchemy.orm.declarative_base) -> None:
        
        self.engine = engine
        self.Session = sessionmaker(bind=self.engine, autoflush=False)
        self.orm_model = orm_model
    
    def ValidateData(self, dataframe:pd.DataFrame, errors:str="stop") -> list:
        
        '''
        Validate if the data is not violating any restriction by defined for the ORM model
        in a database

        Parameters
        ----------

        dataframe: pd.DataFrame
            A pandas DataFrame to verify each row. It's expected
            the same name in the columns of dataframe and the database 

        error: str, ["stop","ignore"]; default="stop"
            How the errors should be handle if some registers are not valid.
            errors="ignore" ignore the current register and continue validating. Current register is not valid
            errors="stop" ends the execution of the script. All the table is not valid

        Returns
        -------

        list: A list containing the names of the indexes of the dataframe that should be dropped

        '''

        session = self.Session()
        bad_rows = []
        
        for row in dataframe.itertuples():

            row = row._asdict()
            index = row.pop("Index")
            
            # If flush failed. It means there is a constraint violation
            try:
                model = self.orm_model(**row)
                session.add(model)
                session.flush()
                
            except SQLAlchemyError as ErrorDataType:
                
                if errors == "ignore":
                    bad_rows.append(index)
                    warnings.warn(f"Data validation failed: {ErrorDataType}")
                else:
                    session.rollback()
                    session.close()
                    if errors == "stop":
                        raise Exception(f"Data validation failed: {ErrorDataType}")
                    else:
                        raise ValueError(f"Argument 'error' not valid: {errors}. Expected ['stop', 'ignore']")
        
        # Undo all changes in the database and close session
        session.rollback()
        print("Done")
        session.close()
        
        return bad_rows

def Extract(video_names:list, videos_id:list, api_key:str) -> "list(dict)":

    '''
    This function extract youtube comments based on the video id

    Parameters
    ----------
    video_names: list
        List containing the titles of the youtube videos. Useful to inform current state of extraction
    
    videos_id: list
        List containing the ids of the youtube videos. It's used to extract comments
        
    api_key: str
        API key to request info from youtube

    Returns
    -------
    
    list(dict): A list of diccionaries with the comments and metadata about them
    
    '''

    start = time()
    print("Exctracting comments")

    # Set up YouTube Data API client
    youtube = build('youtube', 'v3', developerKey=api_key)

    # Call the function to retrieve comments for the specified video
    full_comment_metadata_videos = []
    for name, ID in zip(video_names, videos_id):

        print(f"Retrieving from video: {name}")
        video_comments = GetVideoComments(youtube, ID)
        full_comment_metadata_videos.append(video_comments)

        print(f"Comments retrieved: {len(video_comments)}")

    print(f"Extraction took {time() - start} secs")
    return full_comment_metadata_videos

def Transform(comments:"list(dict)",
              video_names:list, videos_id:list) -> "(pd.DataFrame, pd.DataFrame)":
    
    '''
    This functions transform the comments retrieved from youtube and metadata about
    youtube video title and youtube video id and returns two parsed dataframe
    
    Parameters
    ----------
    
    comments: list(dict)
        List of dictionaries containing the comments for each video and metadata
        
    video_names: list
        List containing the titles of the youtube videos
        
    videos_id: list
        List containing the ids of the youtube videos
        
    Notes
    -----
    
        len(comments) = len(video_names) = len(videos_id)
    
    Returns
    -------
        (pd.DataFrame, pd.DataFrame): A tuple containing the comments dataframe
            and the video dataframe respectively
    
    '''
    
    print("Transfoming data")
    start = time()

    video_id_df = pd.DataFrame({"id_video": video_names,"title": videos_id})
    comments_df = pd.DataFrame()

    # Adding id_video where the comments came from
    for comments_by_video, ID in zip(comments, videos_id):
        df = pd.DataFrame(comments_by_video)
        df["id_video"] = ID
        comments_df = pd.concat([comments_df, df])

    comments_df = comments_df.reset_index(drop=True).copy()

    # Casting correctly date time
    date_series = comments_df["published_date"].str.replace("(T|Z)", " ", regex=True)
    date_format = pd.to_datetime(date_series, format="%Y-%m-%d %H:%M:%S")
    comments_df["published_date"] = date_format

    print(f"Transformation took {time() - start} secs")

    return comments_df, video_id_df

def Clean(comments_df:pd.DataFrame) -> pd.DataFrame:

    '''
    This function remove emojis, html tags, html "codes" and non-printable characters
    
    Parameters
    ----------
    
    comments_df: pd.DataFrame
        A pandas DataFrame with the comments retrieved from youtube
    
    Returns
    -------
    
    pd.DataFrame: A pandas DataFrame cleaned
    
    '''
    
    print("Cleaning data")
    start = time()
    
    # Transforming HTML "codification" to utf-8
    map_columns_to_function = {"comment":html.unescape, "name":html.unescape}
    comments_df[["comment","name"]] = comments_df[["comment","name"]].agg(map_columns_to_function)
    
    # Emoji code patterns
    emoji_patterns = re.compile("["
            u"\U0001F600-\U0001F64F"
            u"\U0001F300-\U0001F5FF"
            u"\U0001F680-\U0001F6FF"
            u"\U0001F1E0-\U0001F1FF"
            u"\U00002500-\U00002BEF"
            u"\U00002702-\U000027B0"
            u"\U00002702-\U000027B0"
            u"\U000024C2-\U0001F251"
            u"\U0001f926-\U0001f937"
            u"\U00010000-\U0010ffff"
            u"\u2640-\u2642" 
            u"\u2600-\u2B55"
            u"\u200d"
            u"\u23cf"
            u"\u23e9"
            u"\u231a"
            u"\ufe0f"
            u"\u3030"
                          "]+", re.UNICODE)
    
    # Non printable characters
    non_printable_patterns = re.compile(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\r]')
    
    # Clean emojis, html tags and detect language
    clean_text = []
    for text in comments_df["comment"]:
        clean_emojis_text = emoji_patterns.sub(r"", text)
        
        # More expensive than ReGex but more useful using BeatifulSoup for html tags
        clean_html_text = BeautifulSoup(clean_emojis_text, "lxml").text
        clean_non_print = non_printable_patterns.sub(r" ", clean_html_text)
        clean_text.append(clean_non_print)
        
    comments_df["comment"] = clean_text

    print(f"Cleaning took {time() - start} secs")

    return comments_df

def Complement(comments_df:pd.DataFrame) -> pd.DataFrame:
    
    # This function try to recognize the language of the comments and added it into
    # the dataframe
    
    # Detecting language
    print("Detecting language")
    start = time()

    comment_languages = []
    for i, text in enumerate(comments_df["comment"], start=1):

        print(f"\tProcessing comment number {i}", end="\r")

        # langdetect doesn't work very well in short sentences. Threshold > 4
        if len(re.split("\s+", text)) > 4:
            language = GetLanguage(text)
        else:
            language = "undefined"

        comment_languages.append(language)

    comments_df["code"] = comment_languages
    
    print(f"\nDetecting language took {(time() - start) / 60} min")

    return comments_df

def Validate(tables:"list(tuple)", engine:sqlalchemy.create_engine, inplace:bool=False) -> "list|(list, list(dict))":

    '''
    This function validate if the data found in a dataframe is not violating any
    constraint defined in the tables of the database
    
    Parameters
    ----------
    
    tables: list(tuple)
        A list of tuple of 3 elements containing respectively
        1. Name of the table in the database
        2. ORM simulating table in the database created with sqlalchemy.orm.declarative_base
        3. DataFrame where the validation will take place
        
    engine: sqlalchemy.create_engine
        An sqlalchemy.create_engine object to connect to the database
        
    inplace:bool; default=False
        Remove the registers not valids inplace
        
    Returns
    -------
    "list|(list, list(dict))": Return a list containing the name of the indexes of the
        DataFrame that should be dropped. if the argument 'inplace' is set True. It returns
        a tuple containing the list plus a list of dictionaries containing the dataframes
        modified where key is the database base name and value the modified dataframe
    
    '''
    
    validated_registers_in_table = {}
    for table_name, orm_model, df in tables:

        print(f"Validating registers in table: {table_name}")
        validator = DataValidator(engine, orm_model)
        bad_rows = validator.ValidateData(df, errors="ignore")

        if inplace:
            df.drop(bad_rows, axis=0,inplace=True)
            validated_registers_in_table[table_name] = df
    
    return (bad_rows, validated_registers_in_table) if inplace else bad_rows
            
def Load(tables:dict, engine:sqlalchemy.create_engine) -> list:
    
    # This function takes a dict containing as keys the name of the tables in the database
    # and values as a DataFrame to load into the database. Return the number of
    # registers modified in the database
    
    modified_registers = []
    for table_name in tables.keys():
        
        print(f"Loading {table_name}")
        registers = tables[table_name].to_sql(table_name,
                                              con=engine,
                                              index=False,
                                              if_exists="append")
        modified_registers.append(registers)

    return modified_registers

def ConnectDataBase() -> sqlalchemy.create_engine:
    
    username = input("User postgres:")
    host = input("Host:")
    database = input("Database:")
    password = getpass.getpass("Password:")

    engine = create_engine(f'postgresql://{username}:{password}@{host}/{database}')
    return engine


In [None]:
if __name__ == "__main__":
    
    # Video title and ID for which I want to retrieve comments
    video_names = ["What Happened Before History? Human Origins",
                "The Past We Can Never Return To - The Anthropocene Reviewed",
                "Why Blue Whales Don't Get Cancer - Peto's Paradox",
                "What If We Detonated All Nuclear Bombs at Once?",
                "We WILL Fix Climate Change!",
                "Building a Marsbase is a Horrible Idea: Let's do it!",
                "What if We Nuke a City?"]

    videos_id = ["dGiQaabX3_o","YbgnlkJPga4",
                "1AElONvi9WQ","JyECrGp-Sw8",
                "LxgMdjyw8uw","uqKGREZs6-w",
                "5iPH-br_eJQ"]
    
    api_key = LoadKeyYotube()
    
    # Extract, convert into a dataframe, clean and add language of the comment
    comments_dict = Extract(video_names, videos_id, api_key)
    comments_df, titles_df = Transform(comments_dict, video_names, videos_id)
    comments_df = Clean(comments_df)
    comments_df = Complement(comments_df)

    # Tables without foreign keys in the data base
    languages_df = pd.DataFrame(comments_df["code"].unique(), columns=["code"])
    titles_df = pd.DataFrame({"id_video":videos_id, "title":video_names})
    users_df = pd.DataFrame(comments_df["name"].unique(), columns=["name"])

    tables_isolated = [("languages", Languages, languages_df),
                    ("titles", Titles, titles_df),
                    ("users", Users, users_df)]
    
    engine = ConnectDataBase()

    # Validate and load dataframes into the postgres database
    _, tables_isolated_validated = Validate(tables_isolated, engine, True)
    _ = Load(tables_isolated_validated, engine)

    # Retrieve id from new registers added by the "isolated" tables
    registers_languages = pd.read_sql("SELECT * FROM languages", engine)
    registers_users = pd.read_sql("SELECT * FROM users", engine)

    # Mapping with the appropiate values in the foreign keys columns
    # and the new registers added
    comments_df_mapped = comments_df.copy().merge(registers_languages,
                                                    how="left",
                                                    on="code")

    comments_df_mapped.drop(["code"], axis=1, inplace=True)

    comments_df_mapped = comments_df_mapped.merge(registers_users,
                                                how="left",
                                                on="name")

    comments_df_mapped.drop(["name"], axis=1, inplace=True)
    
    # Table with foreign key
    tables_no_isolated = [("comments", Comments, comments_df_mapped)]

    _, tables_no_isolated_validated = Validate(tables_no_isolated, engine, True)
    _ = Load(tables_no_isolated_validated, engine)

    engine.dispose()
