### Base class for SQL writer

In [7]:
# ##### MySQL Writer Class
import pandas as pd
import pymysql
from datetime import datetime, timezone
import pytz

class MySQLWriter:
    """
    A class to handle MySQL database operations.
    """
    def __init__(self, host, user, password, database):
        # Initializes the MySQLWriter with connection parameters and ensures required tables exist.
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            charset="utf8mb4",
            cursorclass=pymysql.cursors.DictCursor
        )

        # Check if database exists, create if it doesn't
        self.create_database_if_not_exists(database)
        self.connection.select_db(database)

        # Create necessary tables if they do not exist
        self.create_table_archives()
        self.create_table_articles()
        self.create_table_contents()

    def create_database_if_not_exists(self, database):
        with self.connection.cursor() as cursor:
            cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
        self.connection.commit()

    def add_import_timestamp(self, df):
        # Adds an 'import_timestamp' column to the DataFrame.
        df['import_date'] = datetime.now()
        return df

    def record_exists(self, table_name, primary_key_column, primary_key_value):
        # Checks if a record with the specified primary key exists in the given table.
        query = f"SELECT COUNT(1) AS count FROM {table_name} WHERE {primary_key_column} = %s"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, (primary_key_value,))
                result = cursor.fetchone()
                return result['count'] > 0
        except Exception as e:
            print(f"Error checking record existence in table '{table_name}':", e)
            return False

    # -----------------------------------------------------------------------------------
    # Archives Table Methods
    # -----------------------------------------------------------------------------------

    def create_table_archives (self):
        # Creates the 'archives' table in the database if it does not already exist.
        create_table_query = """
        CREATE TABLE IF NOT EXISTS archives (
            archive_url VARCHAR(500) PRIMARY KEY,
            volume_number VARCHAR(255),
            title VARCHAR(500) ,
            publication_date DATE,
            editor TEXT,
            import_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            status VARCHAR(10)
        );
        """
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(create_table_query)
                self.connection.commit()
                print("Table 'archives' is ready.")
        except Exception as e:
            print("Error creating table 'archives':", e)
    
    def read_table_archives (self):
        query = "SELECT * FROM archives order by archive_url;"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)
            # print("Successfully read data from 'archives' table.")
            return df
        except Exception as e:
            print(f"Error reading 'archives' table: {e}")
            return pd.DataFrame()  # Return an empty DataFrame in case of error 
    
    def insert_archives (self, archives_df):
        # Inserts into the 'archives' table

        for _, row in archives_df.iterrows():
            archive_url = row['archive_url']
            if not self.record_exists("archives", "archive_url", archive_url):
                query = """
                INSERT INTO archives (archive_url, volume_number, title, publication_date, editor, import_date, status)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    volume_number = VALUES (volume_number),
                    title = VALUES (title),
                    publication_date = VALUES (publication_date),
                    editor = VALUES (editor),
                    import_date = VALUES (import_date),
                    status = VALUES (status)
                """
                try:
                    with self.connection.cursor() as cursor:
                        cursor.execute(query, (
                            row['archive_url'], row['volume_number'], row['title'], 
                            row['publication_date'], row['editor'], row['import_date'], row['status']
                        ))
                        self.connection.commit()
                        print(f"Inserted archive URL: {archive_url}")
                except Exception as e:
                    self.connection.rollback()
                    print(f"Error inserting archive URL '{archive_url}':", e)
            else:
                print(f"Archive URL '{archive_url}' already exists. Skipping.")

    def update_archives (self, archives_df):
        # Update 'archives' table

        for _, row in archives_df.iterrows():
            archive_url = row['archive_url']
            query = """
            INSERT INTO archives (archive_url, volume_number, title, publication_date, editor, import_date, status)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                volume_number = VALUES (volume_number),
                title = VALUES (title),
                publication_date = VALUES (publication_date),
                editor = VALUES (editor),
                import_date = VALUES (import_date),
                status = VALUES (status)
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(query, (
                        row['archive_url'], row['volume_number'], row['title'], 
                        row['publication_date'], row['editor'], row['import_date'], row['status']
                    ))
                    self.connection.commit()
                    print(f"Updated archive_url: {archive_url}")
            except Exception as e:
                self.connection.rollback()
                print(f"Error updating archive_url '{archive_url}':", e)

    # -----------------------------------------------------------------------------------
    # Articles Table Methods
    # -----------------------------------------------------------------------------------

    def create_table_articles (self):
        # Creates the 'articles' table in the database if it does not already exist.
        create_table_query = """
        CREATE TABLE IF NOT EXISTS articles (
            article_url VARCHAR(600) PRIMARY KEY,
            title VARCHAR(500),
            doi VARCHAR(500),
            publication_date DATE,
            author TEXT,
            abstract TEXT,
            archive_url VARCHAR(600),
            content_url VARCHAR(600),
            import_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            status VARCHAR(10)
        );
        """
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(create_table_query)
                self.connection.commit()
                print("Table 'articles' is ready.")
        except Exception as e:
            print("Error creating table 'articles':", e)

    def read_table_articles (self, status):
        if(status=="PENDING"):
            query = f"SELECT * FROM articles where status='PENDING';"
        else:
            query = f"SELECT * FROM articles where status is null or status ='';"
        # query = "SELECT * FROM articles WHERE abstract = '';"
        # query = "SELECT * FROM articles where article_url='https://firstmonday.org/ojs/index.php/fm/article/view/10005';"
    
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)
            # print("Successfully read data from 'articles' table.")
            return df
        except Exception as e:
            print(f"Error reading 'articles' table: {e}")
            return pd.DataFrame()  # Return an empty DataFrame in case of error

    def insert_articles (self, articles_df):
        # Inserts into the 'articles' table

        for _, row in articles_df.iterrows():
            article_url = row['article_url']
            if not self.record_exists("articles", "article_url", article_url):
                query = """
                INSERT INTO articles (article_url, title, doi, publication_date, author, abstract, archive_url, content_url, import_date, status)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    title = VALUES (title),
                    doi = VALUES (doi),
                    publication_date = VALUES (publication_date),
                    author = VALUES (author),
                    abstract = VALUES (abstract),
                    archive_url = VALUES (archive_url),
                    content_url = VALUES (content_url),
                    import_date = VALUES (import_date),
                    status = VALUES (status)
                """
                try:
                    with self.connection.cursor() as cursor:
                        cursor.execute(query, (
                            row['article_url'], row['title'], row['doi'],
                            row['publication_date'], row['author'], row['abstract'],
                            row['archive_url'], row['content_url'],
                            row['import_date'], row['status']
                        ))
                        self.connection.commit()
                        print(f"Inserted article URL: {article_url}")
                except Exception as e:
                    self.connection.rollback()
                    print(f"Error inserting article URL '{article_url}':", e)
            else:
                print(f"Article URL '{article_url}' already exists. Skipping.")

    def update_articles (self, articles_df):
        # Update 'articles' table

        for _, row in articles_df.iterrows():
            article_url = row['article_url']
            query = """
            INSERT INTO articles (article_url, title, doi, publication_date, author, abstract, archive_url, content_url, import_date, status)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                title = VALUES (title),
                doi = VALUES (doi),
                publication_date = VALUES (publication_date),
                author = VALUES (author),
                abstract = VALUES (abstract),
                archive_url = VALUES (archive_url),
                content_url = VALUES (content_url),
                import_date = VALUES (import_date), 
                status = VALUES (status)
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(query, (
                            row['article_url'], row['title'], row['doi'],
                            row['publication_date'], row['author'], row['abstract'],
                            row['archive_url'], row['content_url'],
                            row['import_date'], row['status']
                    ))
                    self.connection.commit()
                    print(f"Updated article_url: {article_url}")
                    
            except Exception as e:
                self.connection.rollback()
                print(f"Error updating article_url '{article_url}':", e)

    # -----------------------------------------------------------------------------------
    # Contents Table Methods
    # -----------------------------------------------------------------------------------

    def create_table_contents (self):
        # Creates the 'contents' table in the database if it does not already exist.
        create_table_query = """
        CREATE TABLE IF NOT EXISTS contents (
            content_url VARCHAR(600) PRIMARY KEY,
            content TEXT
        );
        """
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(create_table_query)
                self.connection.commit()
                print("Table 'contents' is ready.")
        except Exception as e:
            print("Error creating table 'contents':", e)

    def read_table_contents (self):
        query = "SELECT * FROM contents;"
    
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)
            # print("Successfully read data from 'contents' table.")
            return df
        except Exception as e:
            print(f"Error reading 'contents' table: {e}")
            return pd.DataFrame()  # Return an empty DataFrame in case of error
    
    def read_table_articles_wo_contents (self):
        query = "SELECT content_url FROM articles where content_url not in (SELECT content_url from contents) and content_url <> '' and content_url is not null LIMIT 1;"
    
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)
            # print("Successfully read data from 'contents' table.")
            return df
        except Exception as e:
            print(f"Error reading 'contents' table: {e}")
            return pd.DataFrame()  # Return an empty DataFrame in case of error
        
    def insert_contents (self, contents_df):
        # Inserts into the 'contents' table

        for _, row in contents_df.iterrows():
            content_url = row['content_url']
            if not self.record_exists("contents", "content_url", content_url):
                query = """
                INSERT INTO contents (content_url, content)
                VALUES (%s, %s)
                ON DUPLICATE KEY UPDATE
                    content_url = VALUES (content_url),
                    content = VALUES (content)
                """
                try:
                    with self.connection.cursor() as cursor:
                        cursor.execute(query, (
                            row['content_url'], row['content']
                        ))
                        self.connection.commit()
                        print(f"Inserted article URL: {content_url}")
                except Exception as e:
                    self.connection.rollback()
                    print(f"Error inserting article URL '{content_url}':", e)
            else:
                print(f"Article URL '{content_url}' already exists. Skipping.")

    def close_connection(self):
        # Close the MySQL database connection.
        if self.connection:
            self.connection.close()
            print("MySQL connection closed.")
    
    def archives_to_csv(self):
        # Reads the 'threads' table from the database and returns it as pandas dataframe, then write to csv
        query = "SELECT * FROM archives order by archive_url;"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)

            # Write to csv
            csv_file_path = 'data/archives.csv'
            df.to_csv(csv_file_path, index=False)
            print(f"Archives has been exported to '{csv_file_path}' successfully.")
            
        except Exception as e:
            print(f"Error exporting 'archives' table: {e}")

    def articles_to_csv(self):
        # Reads the 'threads' table from the database and returns it as pandas dataframe, then write to csv
        query = "SELECT * FROM articles order by archive_url;"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)

            # Write to csv
            csv_file_path = 'data/articles.csv'
            df.to_csv(csv_file_path, index=False)
            print(f"Archives has been exported to '{csv_file_path}' successfully.")
            
        except Exception as e:
            print(f"Error exporting 'archives' table: {e}")
    
    def contents_to_csv(self):
        # Reads the 'threads' table from the database and returns it as pandas dataframe, then write to csv
        query = "SELECT * FROM contents;"
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
            # Convert the result into a Pandas DataFrame
            df = pd.DataFrame(result)

            # Write to csv
            csv_file_path = 'data/contents.csv'
            df.to_csv(csv_file_path, index=False)
            print(f"Contents has been exported to '{csv_file_path}' successfully.")
            
        except Exception as e:
            print(f"Error exporting 'contents' table: {e}")