In [None]:
# mydic_func.py

import MySQLdb
# import logging
import os

class MySQLDBcon:
    def __init__(self, connection_string=os.environ["CONN"], db=''):
        # connect(host, user, passwd, db)

        connection_string = connection_string + ',' + db

        vars = connection_string.split(',')
        self.host = vars[0]
        self.user = vars[1]
        self.pw = vars[2]
        self.char = "utf8mb4"
        # print(connection_string)
        self.connector = None

    def connect(self, dbname):
        try:
            self.connector = MySQLdb.connect(host=self.host,
                                            user=self.user,
                                            passwd=self.pw,
                                            charset=self.char,)
        except Exception as e:
            print(e)


    """   fetchall(),
      the return value is a sequence of "tuples" that contain
      the "row values".
    """
    def query_db(self, sql_cmd, db_use = ''):
        try:
            cur = self.connector.cursor()
            r = cur.execute(sql_cmd)
            # print(sql_cmd)
            if r != 0:
                r = cur.fetchall()

            self.connector.commit()
        except Exception as e:
            print(e)
            self.connector.rollback()
        finally:
            cur.close()
            # self.connector.close()
            return r

    def __del__(self):
        print('db close')
        self.connector.close()


In [None]:
# test_mydic_func.py

from mydic_func import MySQLDBcon

import pytest

@pytest.fixture(scope='module')  # default scope=function , module, session
def db():
    print('------setup------')
    db = MySQLDBcon()
    db.connect('mydic')
    # return db
    yield db
    print('------teardown------')
    # db.close()

def test_scott_data(db):
    sql = "SELECT * FROM mydic.words;"
    r = db.query_db(sql)
    print(r)
    # assert scott_data['result'] == 'pass'

def test_APWD_data(db):
    sql = "SELECT * FROM mydic.words WHERE word = 'A\.PWD';"
    r = db.query_db(sql)
    print(r)
    # assert scott_data['result'] == 'pass'
    

In [None]:
# db_mysql_v2.py

import MySQLdb
# import logging
import os

class MySQLdb_connection(object):
    """mysql db connection"""
    def __init__(self, connection_string=os.environ["CONN"], db=''):
        # connect(host, user, passwd, db)

        connection_string = connection_string + ',' + db

        vars = connection_string.split(',')
        self.host = vars[0]
        self.user = vars[1]
        self.pw = vars[2]
        self.char = "utf8mb4"
        # print(connection_string)
        self.connector = None

        try:
            self.connector = MySQLdb.connect(host=self.host,
                                            user=self.user,
                                            passwd=self.pw,
                                            charset=self.char,)
        except Exception as e:
            print(e)

    def __del__(self):
        self.connector.close()



    """   fetchall(),
      the return value is a sequence of "tuples" that contain
      the "row values".
    """
    def query_db(self, sql_cmd, db_use = ''):
        cur = self.connector.cursor()
        r = cur.execute(sql_cmd)
        
        if r != 0:
            r = cur.fetchall()
        return r


# executes when your script is called from the command-line
if __name__ == "__main__":
    print('How to use?\n'
          'MySQLdb_connection.query_db("your-sql-cmd")')

"""
    # sql = "INSERT INTO mydic.words (word, descr) VALUES ('%s', '%s')" % \
    # ('00test', 'test data description')
    # MySQLdb_connection.query_db(sql)
    #
    # sql = "SELECT word FROM mydic.words WHERE word = '00test'"
    # results = MySQLdb_connection.query_db(sql)
    # print(results)
    #
    # sql = "DELETE FROM mydic.words WHERE word = '00test'"
    # results = MySQLdb_connection.query_db(sql)
    # print(results)
    #
    # assert results[0][0] == 'first decorator'
"""

In [None]:
# CREATE USER 'newuser'@'localhost' IDENTIFIED BY 'user_password';


import MySQLdb
# import logging
import os

class MySQLdb_connection(object):
#     """mysql db connection"""
    def __init__(self, connection_string=os.environ["CONN"], db='mydic'):
        # connect(host, user, passwd, db)
        if db != 'mydic':
            connection_string = connection_string + ',' + db

        self.connection_string = connection_string.split(',')
        #print(self.connection_string)
        self.connector = None

    '''
    These methods of the context manager are __enter()__ and __exit()__
    and are known popularly as dunder methods, as they are surrounded
    by double underscores. Hence, they are also called dunder-enter and
    dunder-exit respectively.
    Both the enter() and exit() methods are called every time
    the with statement is executed no matter how the code block terminates.
    So, basically a context manager ensures that resources are properly
    and automatically managed around the code that uses these resources.
    '''
    def __enter__(self):
        self.connector = MySQLdb.connect(*self.connection_string)
        return self.connector

    def __exit__(self, exc_type, exc_val, exc_tb):
        # print('MySQLdb_connection.__exit__:'
        #             'Exception Detected\n'
        #             'type={}, value={}, traceback={}'.format(exc_type, exc_val, exc_tb))
        if exc_tb is None:
            self.connector.commit()
        else:
            self.connector.rollback()
        self.connector.close()


    """   fetchall(),
      the return value is a sequence of tuples that contain the row values.
    """
    def query_db(sql_cmd, db_use = ''):
        with MySQLdb_connection(db = db_use) as conn:
            with conn.cursor() as cur:
                r = cur.execute(sql_cmd)
                #print(r)
                if r != 0:
                    r = cur.fetchall()
                return r


# executes when your script is called from the command-line
if __name__ == "__main__":
    print('How to use?\n'
          'MySQLdb_connection.query_db("your-sql-cmd")')

"""
    # sql = "INSERT INTO mydic.words (word, descr) VALUES ('%s', '%s')" % \
    # ('00test', 'test data description')
    # MySQLdb_connection.query_db(sql)
    #
    # sql = "SELECT word FROM mydic.words WHERE word = '00test'"
    # results = MySQLdb_connection.query_db(sql)
    # print(results)
    #
    # sql = "DELETE FROM mydic.words WHERE word = '00test'"
    # results = MySQLdb_connection.query_db(sql)
    # print(results)
    #
    # assert results[0][0] == 'first decorator'
"""

In [None]:
import os
import dbconn as db
import MySQLdb
import unittest
import datetime as dt

### python -m unittest test_db.py

class TestMySQLdb_connection(unittest.TestCase):
    """
    Test the MySQLdb_connection
    """
    def setUp(self):
        """
        Setup here,    create database & create table
        """
        print('setUp...')
        # db.MySQLdb_connection.create_db(db_name = 'mydb')
        sql = "CREATE DATABASE IF NOT EXISTS %s" % 'mydb'
        # db.MySQLdb_connection.query_db(sql, db_use = 'mydic')
        db.MySQLdb_connection.query_db(sql)

        sql = "CREATE TABLE IF NOT EXISTS %s (id INT AUTO_INCREMENT \
            PRIMARY KEY, name VARCHAR(255), salary INT(6))" % 'test'
        r2 = db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        #print(r2)
        r2 = db.MySQLdb_connection.query_db("SHOW TABLES;", db_use = 'mydb')
        print(r2)
        self.assertEqual(r2[0][0], 'test')

    def tearDown(self):
        """
        Delete the database
        """

        sql = "DROP DATABASE mydb"
        db.MySQLdb_connection.query_db(sql)
        r = db.MySQLdb_connection.query_db("SHOW DATABASES LIKE '%s'" % 'mydb')

        # self.assertEqual(r[0][0], 'mydb')
        print('tearDown...')


    def test_insert(self):
        """
        Tests that we can successfully insert
        """

        sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
        ('John', 60000)
        db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
        ('Tony', 50000)
        db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
        ('Chris', 70000)
        r2 = db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        # self.assertEqual(r2, ())

        sql = "SELECT * FROM test"
        r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
        #print(r2)
        self.assertEqual(r2[0][1], 'John')

    def test_update(self):
        """
        Tests that we can successfully update
        """

        sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
        ('Chris', 70000)
        r2 = db.MySQLdb_connection.query_db(sql, db_use = 'mydb')

        sql = "UPDATE test SET salary = 300 WHERE name = 'Chris'"
        db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        sql = "SELECT * FROM test WHERE name = 'Chris'"
        r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
        #print(r2)
        self.assertEqual(r2[0][2], 300)

    def test_delete(self):
        """
        Tests that we can successfully delete
        """

        sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
        ('John', 70000)
        r2 = db.MySQLdb_connection.query_db(sql, db_use = 'mydb')

        sql = "DELETE FROM test WHERE name = 'John'"
        db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
        sql = "SELECT * FROM test WHERE name = 'John'"
        r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
        #print(r2)
        self.assertEqual(r2, 0)


    # def test_insert_update_delete(self):
    #     """
    #     Tests that we can successfully insert, update, delete
    #     """
    #
    #     sql = "INSERT INTO test (name, salary) VALUES ('%s', '%s')" % \
    #     ('John', 60000)
    #     r2 = db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
    #     self.assertEqual(r2, ())
    #
    #     sql = "SELECT * FROM test WHERE name = 'John'"
    #     r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
    #     #print(r2)
    #     self.assertEqual(r2[0][1], 'John')
    #
    #     sql = "UPDATE test SET salary = 300"
    #     db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
    #     sql = "SELECT * FROM test WHERE name = 'John'"
    #     r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
    #     #print(r2)
    #     self.assertEqual(r2[0][2], 300)
    #
    #     sql = "DELETE FROM test WHERE name = 'John'"
    #     db.MySQLdb_connection.query_db(sql, db_use = 'mydb')
    #     sql = "SELECT * FROM test WHERE name = 'John'"
    #     r2 = db.MySQLdb_connection.query_db(sql, 'mydb')
    #     #print(r2)
    #     self.assertEqual(r2, 0)


"""
# Return Values

# For SELECT, SHOW, DESCRIBE, EXPLAIN and other statements returning resultset,
# mysql_query() returns a resource on success, or FALSE on error.

# For other type of SQL statements, INSERT, UPDATE, DELETE, DROP, etc,
# mysql_query() returns TRUE(1) on success or FALSE(0) on error.

"""

In [None]:
## //////// decorator version ///////////

## A decorator is a design pattern in Python that allows
## a user to add new functionality to an existing object
## without modifying its structure.

# Python allows a nested function to access the outer scope of
# the enclosing function.
# This is a critical concept in decorators -- this pattern is
# known as a Closure.

def db_connector(func):
    def with_connection_(*args, **kwargs):
        #print(args, kwargs)
        conn_str = os.environ["CONN"]
        #print(conn_str)
        #conn_str = conn_str + ',' + 'mydic'  # default db
        cnn = MySQLdb.connect(*conn_str.split(','))
        try:
            rv = func(cnn, *args, **kwargs)
            #print(cnn, args, kwargs)
        except Exception:
            cnn.rollback()
            logging.error("Database connection error")
            raise
        else:
            cnn.commit()
        finally:
            cnn.close()
        return rv
    return with_connection_


@db_connector
def do_some_job(cnn, arg1, db_name=None):
    #print(cnn, arg1, db_name)
    cur = cnn.cursor()
    SQL = "SELECT descr FROM mydic.words WHERE word = '%s' " % (arg1)
    #cur.execute(SQL, (arg1, arg2)) # or something else
    cur.execute(SQL)
    print(cur.fetchall())

# executes when your script is called from the command-line
if __name__ == "__main__":
    #main(sys.argv)

    #### using decorator version
    # do_some_job('11test', db_name='mydic')   is   samme as
    # db_connector( do_some_job('11test', db_name='mydic') )
    do_some_job('11test', db_name='mydic')

    #### using class version
    sql = "SELECT * FROM mydic.words WHERE word = '11test' "
    with MySQLdb_connection() as conn:
        data =  pd.read_sql(sql, conn)
        #raise Exception('my exception test simulation')
    print(data)
