In [1]:
import os

In [3]:
import mysql.connector
from mysql.connector import Error
# Establish connection
mydb = mysql.connector.connect(
   host="hostname",
    user="username",
    password="<password>"

)

# Check if connection was successful
if mydb.is_connected():
  print("Connected to MySQL!")
    
else:
  print("Failed to connect to MySQL")

Connected to MySQL!


In [None]:
# Specify the name of the new database to create
db_name = "youtubecommunity"

# Create a cursor object to execute SQL queries
mycursor = mydb.cursor()

# Execute the SQL query to create a new database
mycursor.execute(f"CREATE DATABASE {db_name}")

print(f"Database '{db_name}' created successfully!")


In [5]:
# show databases :-

# Create a cursor object to execute SQL queries
mycursor = mydb.cursor()

# Execute the SQL query to create a new database
mycursor.execute("SHOW DATABASES")

for db in mycursor:
    print(db)


('demo',)
('information_schema',)
('mysql',)
('performance_schema',)
('sys',)
('youtubecommunity',)


In [None]:
# create table

mydb = mysql.connector.connect(
    host="hostname",
    user="username",
    password="<password>",
    database_name="<database_name>"
)
mycursor = mydb.cursor()

mycursor.execute("CREATE TABLE students (name VARCHAR(255), age INTEGER(10))")


In [7]:
# show tables 

mycursor.execute("SHOW TABLES")

for tb in mycursor:
    print(tb)

('students',)


In [7]:
# add data in tables

sqlFormula = "INSERT INTO students (name,age) VALUES (%s,%s)"
students =  [("Bob",22),
             ("Aman",23),
             ("Rahul",24),
             ("Jacob",25),
             ("Michalle",27)]

mycursor.executemany(sqlFormula,students)
mydb.commit()

In [226]:
import mysql.connector
from mysql.connector import Error
import pandas as pd
import json
from typing import Any , Optional , List, Dict , Union
from ensure import ensure_annotations

class mysql_operation:
    @ensure_annotations 
    def __init__(self, host: str, user: str, password: str):
        self.host = host
        self.user = user
        self.password = password
    
    @ensure_annotations 
    def create_connection(self):
        """Create a MySQL connection."""
        try:
            connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
            )
            if connection.is_connected():
                print("Connected to MySQL database")
            return connection
        except Error as e:
            print(f"Error while connecting to MySQL: {e}")
            return None

    @ensure_annotations 
    def create_database(self,database_name:Optional[str] = None):
        """Create a MySQL database if it doesn't exist."""
        connection = None
        cursor = None
        
        try:
            # Connect to the MySQL server (no specific database mentioned yet)
            connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password
            )
            if connection.is_connected():
                print("Connected to MySQL server")

            cursor = connection.cursor()

            # Create the database if it doesn't exist
            cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database_name}")
            print(f"Database '{database_name}' created or already exists.")

        except Error as e:
            print(f"Error creating database: {e}")

        finally:
            # Close cursor and connection
            if cursor:
                cursor.close()
            if connection:
                connection.close()
            print("MySQL connection is closed.")
            
    @ensure_annotations 
    def create_table(self, create_table_sql: str,database_name:Optional[str] = None):
        """Create a table in the specified MySQL database using a SQL query."""
        connection = None
        cursor = None
        
        try:
            # Connect to the MySQL server (specify the database now)
            connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=database_name  # Now include the database name
            )
            if connection.is_connected():
                print(f"Connected to MySQL database: {database_name}")

            cursor = connection.cursor()
            cursor.execute(create_table_sql)
            print("Table created successfully.")
            return True
            
        except Error as e:
            print(f"Error creating table: {e}")
            return None 
            
        finally:
            if cursor:
                cursor.close()
            if connection:
                connection.close()
            print("MySQL connection is closed.")
            
            
    # def insert_record(self,record: Union[dict, List[dict]] = {},table_name: Optional[str] = None,database_name:Optional[str] = None):
    #     """Insert one or many records into the specified MySQL table."""
    #     connection = mysql.connector.connect(
    #     host=self.host,
    #     user=self.user,
    #     password=self.password,
    #     database=database_name  # Specify the database here
    # )
    #     cursor = connection.cursor()

    #     if isinstance(record, list):
    #         # Ensure all items in the list are dictionaries
    #         for data in record:
    #             if not isinstance(data, dict):
    #                 raise TypeError("Each record in the list must be a dictionary")

    #         # Prepare SQL for inserting multiple records
    #         columns = ', '.join(record[0].keys())
    #         placeholders = ', '.join(['%s'] * len(record[0]))
    #         sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    #         # Extract the values for each record as tuples
    #         values = [tuple(data.values()) for data in record]
    #         cursor.executemany(sql, values)
            
    #     elif isinstance(record, dict):
    #         # Prepare SQL for inserting a single record
    #         columns = ', '.join(record.keys())
    #         placeholders = ', '.join(['%s'] * len(record))
    #         sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    #         # Extract the values as a tuple
    #         values = tuple(record.values())
    #         cursor.execute(sql, values)
    #     else:
    #         raise TypeError("Record must be a dictionary or a list of dictionaries")

    #     # Commit the transaction
    #     connection.commit()
    #     print(f"Record(s) inserted into {table_name}.")
        
    #     cursor.close()
    #     connection.close()
    
    @ensure_annotations
    def insert_single_record(self, record: dict,table_name: Optional[str] = None,database_name:Optional[str] = None):
        """Insert a single record into the specified MySQL table."""
        # Connect to MySQL
        connection = mysql.connector.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=database_name
        )
        cursor = connection.cursor()

        # Prepare SQL for inserting a single record
        columns = ', '.join(record.keys())
        placeholders = ', '.join(['%s'] * len(record))
        sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

        # Extract the values as a tuple
        values = tuple(record.values())

        cursor.execute(sql, values)

        # Commit the transaction
        connection.commit()
        print(f"Single record inserted into {table_name}.")

        # Close the cursor and connection
        cursor.close()
        connection.close()

    @ensure_annotations
    def insert_multiple_records(self, records: list, table_name: Optional[str] = None, database_name: Optional[str] = None):
        """Insert multiple records into the specified MySQL table."""
        if not records:
            print("No records provided to insert.")
            return

        if not isinstance(records, list) or not all(isinstance(record, dict) for record in records):
            raise TypeError("Records must be a list of dictionaries.")

        # Connect to MySQL
        connection = mysql.connector.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=database_name
        )
        cursor = connection.cursor()

        # Prepare SQL for inserting multiple records
        columns = ', '.join(records[0].keys())
        placeholders = ', '.join(['%s'] * len(records[0]))
        sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

        # Extract the values for each record as tuples
        values_list = [tuple(record.values()) for record in records]

        cursor.executemany(sql, values_list)

        # Commit the transaction
        connection.commit()
        print(f"Multiple records inserted into {table_name}.")

        # Close the cursor and connection
        cursor.close()
        connection.close()

        
    @ensure_annotations
    def bulk_insert(self, datafile: str, table_name: Optional[str] = None, database_name:Optional[str] = None,unique_field: Optional[str] = None):
        """Bulk insert records from a CSV or Excel file."""
        # connection = self.create_connection()
        connection = mysql.connector.connect(
        host=self.host,
        user=self.user,
        password=self.password,
        database=database_name  # Specify the database here
        )
        
        cursor = connection.cursor()

        if datafile.endswith('.csv'):
            data = pd.read_csv(datafile)
        elif datafile.endswith('.xlsx'):
            data = pd.read_excel(datafile)
        
        data_json = json.loads(data.to_json(orient='records'))

        for record in data_json:
            columns = ', '.join(record.keys())
            placeholders = ', '.join(['%s'] * len(record))
            if unique_field:
                select_query = f"SELECT COUNT(*) FROM {table_name} WHERE {unique_field} = %s"
                cursor.execute(select_query, (record[unique_field],))
                result = cursor.fetchone()
                if result[0] == 0:
                    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
                    cursor.execute(sql, tuple(record.values()))
                else:
                    print(f"Record with {unique_field}={record[unique_field]} already exists. Skipping insertion.")
            else:
                sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
                cursor.execute(sql, tuple(record.values()))
        
        connection.commit()
        print("Bulk insert completed.")
        cursor.close()
        connection.close()

    @ensure_annotations
    def find(self, query: dict = {}, table_name: Optional[str] = None,database_name:Optional[str] = None):
        """Retrieve records from the specified MySQL table based on the query."""
        # connection = self.create_connection()
        connection = mysql.connector.connect(
        host=self.host,
        user=self.user,
        password=self.password,
        database=database_name  # Specify the database here
    )
        cursor = connection.cursor(dictionary=True)  # This will return results as dictionaries

        # If no query is specified, fetch all records
        if not query:
            sql = f"SELECT * FROM {table_name}"
            cursor.execute(sql)
        else:
            # Construct the WHERE clause based on the query dictionary
            conditions = ' AND '.join([f"{key} = %s" for key in query.keys()])
            sql = f"SELECT * FROM {table_name} WHERE {conditions}"
            cursor.execute(sql, tuple(query.values()))

        # Fetch all the results
        results = cursor.fetchall()

        cursor.close()
        connection.close()

        return results

    @ensure_annotations
    def update(self, query: dict={}, new_values: dict={},table_name: Optional[str] = None,database_name:Optional[str] = None):
        """Update records in the MySQL table based on the query and new values."""
        # connection = self.create_connection()
        connection = mysql.connector.connect(
        host=self.host,
        user=self.user,
        password=self.password,
        database=database_name  # Specify the database here
        )
        
        cursor = connection.cursor()

        try:
            # Construct the SET clause from the new_values dictionary
            set_clause = ', '.join([f"{key} = %s" for key in new_values.keys()])

            # Construct the WHERE clause from the query dictionary
            where_clause = ' AND '.join([f"{key} = %s" for key in query.keys()])

            # Prepare the SQL UPDATE statement
            sql = f"UPDATE {table_name} SET {set_clause} WHERE {where_clause}"

            # Combine the values for SET and WHERE clauses
            values = tuple(new_values.values()) + tuple(query.values())

            # Execute the SQL statement
            cursor.execute(sql, values)
            connection.commit()

            print(f"Record(s) updated in {table_name} where {query}.")
            
        except mysql.Error as e:  # Make sure to replace `Error` with the relevant exception class
            print(f"Error updating record: {e}")
        
        finally:
            cursor.close()
            connection.close()

    @ensure_annotations
    def delete(self,query: dict={},table_name: Optional[str] = None,database_name:Optional[str] = None):
        """Delete records from the MySQL table based on the query."""
        # connection = self.create_connection()
        connection = mysql.connector.connect(
        host=self.host,
        user=self.user,
        password=self.password,
        database=database_name  # Specify the database here
        )
        cursor = connection.cursor()

        try:
            # Construct the WHERE clause from the query dictionary
            where_clause = ' AND '.join([f"{key} = %s" for key in query.keys()])

            # Prepare the DELETE statement
            sql = f"DELETE FROM {table_name} WHERE {where_clause}"

            # Combine the values for the WHERE clause
            values = tuple(query.values())

            # Execute the SQL statement
            cursor.execute(sql, values)
            connection.commit()

            print(f"Record(s) deleted from {table_name} where {query}.")
        
        except mysql.Error as e:  # Use the appropriate MySQL error handling class
            print(f"Error deleting record: {e}")
        
        finally:
            cursor.close()
            connection.close()



In [227]:
# Creating an instance of mysql_operation
mysql_connector = mysql_operation(
    host="localhost",
    user="root",
    password="Seaflux@1",
)

In [228]:
mysql_connector.create_connection()

Connected to MySQL database


<mysql.connector.connection_cext.CMySQLConnection at 0x7ff778a5cdd0>

In [217]:
database_name = "demo_test_1"
mysql_connector.create_database(database_name)

Connected to MySQL server
Database 'demo_test_1' created or already exists.
MySQL connection is closed.


In [218]:
create_table_sql = """
CREATE TABLE students (
    name VARCHAR(100) NOT NULL,
    age VARCHAR(100) NOT NULL
);
"""

In [219]:
mysql_connector.create_table(create_table_sql,database_name)

Connected to MySQL database: demo_test_1
Table created successfully.
MySQL connection is closed.


True

In [220]:
# Example usage of one insert_record
record = {
    'name': 'John Doe',
    'age': '50'
}
mysql_connector.insert_single_record(table_name="students", database_name='demo_test_1',record=record)


Single record inserted into students.


In [229]:
# Example usage of many insert_record
record =[
    {'name': 'Abhisek','age': '55'},
    {'name': 'Amitabh','age':'81'},
    {'name':'Ashwaria','age':'50'}    
    ]
mysql_connector.insert_multiple_records(records=record,table_name="students", database_name='demo_test_1')


Multiple records inserted into students.


In [207]:
# Find all records in the 'students' table
database_name = 'demo_test_1'
records = mysql_connector.find(table_name="students",database_name=database_name)
print(records)


[{'name': 'John Doe', 'age': '50'}, {'name': 'Abhisek', 'age': '55'}, {'name': 'Amitabh', 'age': '81'}, {'name': 'Ashwaria', 'age': '50'}]


In [208]:
# Find records where the 'name' is 'Alice' and 'age' is 30
query = {'name': 'Amitabh', 'age': 81}
records = mysql_connector.find(query=query, table_name="students",database_name=database_name)
print(records)


[{'name': 'Amitabh', 'age': '81'}]


In [209]:
# Update the 'age' of the student whose 'name' is 'Alice'
query = {'name': 'Amitabh'}
new_values = {'age': 31}
database_name='demo_test_1'

mysql_connector.update(query=query, new_values=new_values,table_name="students",database_name=database_name)


Record(s) updated in students where {'name': 'Amitabh'}.


In [210]:
query = {'name': "John Doe", 'age': 50}
mysql_connector.delete(query=query,table_name='students',database_name='demo_test_1')


Record(s) deleted from students where {'name': 'John Doe', 'age': 50}.


In [211]:
query = {'name': 'Amitabh','age':31}
mysql_connector.delete(query=query,table_name='students',database_name='demo_test_1')

Record(s) deleted from students where {'name': 'Amitabh', 'age': 31}.


In [212]:
datafile = '/home/seaflux/Documents/mysqlconnectorpkg/experiments/students.csv'
mysql_connector.bulk_insert(datafile=datafile, table_name='students',database_name='demo_test_1', unique_field='name')


Bulk insert completed.


In [213]:
def add(a: int, b: int) -> int:
    return a + b

In [25]:
add(2.5,3)

5.5

In [26]:
from ensure import ensure_annotations

@ensure_annotations
def add(a: int, b: int) -> int:
    return a + b

# If you call `add("2", "3")`, it will raise a TypeError because the arguments don't match the expected types (int).


In [27]:
add(2.5,3)

EnsureError: Argument a of type <class 'float'> to <function add at 0x7fd58e58cae0> does not match annotation type <class 'int'>