In [1]:
import mysql.connector
from mysql.connector import Error
from sqlalchemy import create_engine
from mysql.connector.cursor import MySQLCursorPrepared
import pandas
import os
import sys
import dotenv as env

env.load_dotenv('../keys/keys.env')

True

In [2]:
class mysql_Database:
    __host = os.environ.get('MYSQL_SERVER')
    __mysql_user = os.environ.get('MYSQL_USER_NAME')
    __mysql_passwd = os.environ.get('MYSQL_USER_PASSWD')
    __mysql_database: str
    
    def __init__(self, database:str):
        try:
            self.__connection = mysql.connector.connect(
                                host=mysql_Database.__host,      # Change if your MySQL server is on a different host
                                database=database,  # Replace with your database name
                                user=mysql_Database.__mysql_user,  # Replace with your MySQL username
                                password=mysql_Database.__mysql_passwd  # Replace with your MySQL password
                            )
            if(self.__connection.is_connected()):
                self.__cursor = self.__connection.cursor(prepared=True) 
                mysql_Database.__mysql_database = database
        except Exception as e:
            print(f'Exception occurred while connecting to the database. Please check database name or credentials. {e}.')
            import traceback
            traceback.print_exc()
             
    def execute_a_query(self, query: str, params: tuple=()) -> pandas.DataFrame:
        if(self.__connection.is_connected()):
            self.__cursor.execute(query, params)
            rows = self.__cursor.fetchall()
            df = pandas.DataFrame(rows, columns=[column[0] for column in self.__cursor.description])        # This is not a class or a instanc e variable as you can have 
                                                                                                     # different outcomes for each query which you don't need to save here
            return df
        else:
            return None                     #No dataframe when no connection exists

    def create_table(self, tablename: str, column_defs:str) -> int:
        check_query = '''
                    SELECT COUNT(*) FROM information_schema.tables WHERE table_name='{}'
                    AND table_schema='{}'
                    '''
        check_query = check_query.format(tablename, self.__mysql_database)

        self.__cursor.execute(check_query)
        exists = self.__cursor.fetchone()[0]
        if exists:
            print(f"Table {tablename} already exists. Not created in this execution...")
        else:    
            base_query =  '''CREATE TABLE IF NOT EXISTS {} (
                            {}
                        )
                        '''
            final_query = base_query.format(tablename, column_defs)
            try:
                self.__cursor.execute(final_query)
                self.__connection.commit()
                print(f'Table {tablename} has been created.')
                return 0
            except mysql.connector.Error as err:
                print(f'Error in creating table {tablename} as {err}')
                return -1 

    def insert_values_in_table(self, tablename: str, df: pandas.DataFrame):
        # We expect all data to be pushed to the table to be conforming with MySQL standards
        # Any pre-processing of data types, rearragnment of columns etc must be done before the function is called and df should be in an acceptable format 
        try:
            engine = create_engine(f'mysql+mysqlconnector://{mysql_Database.__mysql_user}:{mysql_Database.__mysql_passwd}@{mysql_Database.__host}:3306/{mysql_Database.__mysql_database}')
            df.to_sql(tablename, con=engine, if_exists='append', index=False)
            print(f'Inserted {df.shape[0]} values in the {tablename}.')
        except Exception as e:
            print(f'Writing DataFrame to database caused an error. {e}')
        
    def search_values_in_table(self, tablename:str, test_field: str, match_values: pandas.Series, output_fields: list[str]=['*']) -> pandas.DataFrame:
        #Only send that column in the input dataframe which are to be matched against the test_fields in the database
        #Allowed for only 1 column testing
        try:
            engine = create_engine(f'mysql+mysqlconnector://{mysql_Database.__mysql_user}:{mysql_Database.__mysql_passwd}@{mysql_Database.__host}:3306/{mysql_Database.__mysql_database}')
            
            if(output_fields[0] == '*'):
                op_fields = '*'
            else:
                op_fields = ', '.join(field for field in output_fields)
            
            match_list = ', '.join(f"\'{value}\'" for value in match_values.tolist())
            
            search_query = f'''SELECT {op_fields} FROM {tablename}
                                WHERE {test_field} IN (
                                {match_list}
                                )
                            '''
            #print(search_query)
            df_results = pandas.read_sql(search_query, con=engine)
            return df_results
        except Exception as e:
            print(f'Searching for {test_field} in the  database caused an error. {e}')
        
    def new_connection(self, database: str):
        try:
            self.__connection = mysql.connector.connect(
                                host=mysql_Database.__host,      # Change if your MySQL server is on a different host
                                database=database,  # Replace with your database name
                                user=mysql_Database.__mysql_user,  # Replace with your MySQL username
                                password=mysql_Database.__mysql_passwd  # Replace with your MySQL password
                            )
            if(self.__connection.is_connected()):
                self.__cursor = self.__connection.cursor(prepared=True)
                mysql_Database.__mysql_database = database
        except Exception as e:
            #Not resetting the class variable __mysql_database in case of failure - maybe it has the old value?
            print(f'Exception occurred while connecting to the database. Please check database name or credentials. {e}.')
            import traceback
            traceback.print_exc()
    
    def exitinguish_connection(self):
        if(self.__connection.is_connected()):
            self.__cursor.close()
            self.__connection.close()

In [3]:
# One line tester
#
#cnxn = mysql_Database('timely_feeds')
#cnxn.create_table('fact_feed_table', 'link_id VARCHAR(36) PRIMARY KEY, site_name VARCHAR(64), sub_site_name VARCHAR (32), link_date TIMESTAMP')
#df = pandas.DataFrame({
#            'link_id': ['120831219', '120834074', '120834046', '120833397', '120833347'],
#            'site_name': ['TOI', 'TOI', 'TOI', 'TOI', 'TOI'],
#            'sub_site_name': ['Mumbai', 'Mumbai', 'Mumbai', 'Mumbai', 'Delhi'],
#            'link_date': ['2025-05-02 22:58:48', '2025-05-02 22:15:48', '2025-05-02 21:58:48', '2025-05-02 20:58:48', '2025-05-02 22:58:40']
#        }
#    )
#cnxn.insert_values_in_table('fact_feed_table', df)

In [4]:
#cnxn = mysql_Database('timely_feeds')
#test_df = pandas.Series(['120831219', '120833397', '12083339712'])
#outcomes = cnxn.search_values_in_table('fact_feed_table', 'link_id', test_df, ['*', 'site_name', 'link_date'])
#outcomes

%reload_ext sql

%config SqlMagic.style = '_DEPRECATED_DEFAULT'

%%sql

select * from mysql.user where user='feed_user' and host='%';

In [5]:
#cnxn = mysql_Database('timely_feeds')
#fields = ['site_name', 'sub_site_name', 'link_id', 'link', 'title', 'link_date', 'classification', 'explanation']
#cnxn.create_table('fact_classified_articles', 
#                    '''id CHAR(36) NOT NULL PRIMARY KEY DEFAULT (UUID()), 
#                        site_name VARCHAR(64), 
#                        sub_site_name VARCHAR (32), 
#                        link_id VARCHAR(32), 
#                        link VARCHAR(256), 
#                        title VARCHAR(256), 
#                        link_date TIMESTAMP,
#                        classification VARCHAR(5),
#                        explanation VARCHAR (200)
#                        ''')