# MySQL

Here is how you would interact with MySQL using Python and the PyMySQL package. This example will showcase the same concepts as the MongoDB example: setting up a connection, querying with conditions, sorting, limiting, counting, indexing, aggregating, error handling, and closing the connection.

In [None]:
!pip install PyMySQL

To connect to a MySQL database:

In [None]:
import pymysql.cursors

# Establish a connection to the MySQL server
try:
    connection = pymysql.connect(host='localhost',
                                 user='user',
                                 password='passwd',
                                 db='mydb',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    print("Connected successfully!")
except:
    print("Connection failed!")

To query with conditions:

In [None]:
with connection.cursor() as cursor:
    # Find users where 'age' is greater than 30
    sql = "SELECT * FROM `users` WHERE `age` > %s"
    cursor.execute(sql, (30,))
    results = cursor.fetchall()
    for result in results:
        print(result)

Sorting results:

In [None]:
with connection.cursor() as cursor:
    # Sort users by 'age' in descending order
    sql = "SELECT * FROM `users` ORDER BY `age` DESC"
    cursor.execute(sql)
    results = cursor.fetchall()
    for result in results:
        print(result)

Limiting results:

In [None]:
with connection.cursor() as cursor:
    # Get the first 5 users
    sql = "SELECT * FROM `users` LIMIT 5"
    cursor.execute(sql)
    results = cursor.fetchall()
    for result in results:
        print(result)

Counting documents:

In [None]:
with connection.cursor() as cursor:
    # Count the number of users where 'age' is greater than 30
    sql = "SELECT COUNT(*) FROM `users` WHERE `age` > %s"
    cursor.execute(sql, (30,))
    count = cursor.fetchone()
    print(count)

Creating an index:

In [None]:
with connection.cursor() as cursor:
    # Create an index on the 'age' field
    sql = "CREATE INDEX age_index ON `users`(`age`)"
    cursor.execute(sql)
    connection.commit()

Aggregation:

In [None]:
with connection.cursor() as cursor:
    # Group users by 'city' and get the average age in each city
    sql = "SELECT `city`, AVG(`age`) as average_age FROM `users` GROUP BY `city`"
    cursor.execute(sql)
    results = cursor.fetchall()
    for result in results:
        print(result)

Error Handling:

In [None]:
from pymysql.err import OperationalError

try:
    # Try to establish a connection
    connection = pymysql.connect(host='localhost',
                                 user='user',
                                 password='passwd',
                                 db='mydb',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    print("Connected successfully!")
except OperationalError:
    print("Server not available")

Finally, you close the connection:

In [None]:
connection.close()

# An Example Database ORM like style

In [None]:
class Blog:
    def __init__(self, id, name):
        self.id = id
        self.name = name

class Post:
    def __init__(self, id, title, content, blog_id):
        self.id = id
        self.title = title
        self.content = content
        self.blog_id = blog_id

class Comment:
    def __init__(self, id, content, post_id):
        self.id = id
        self.content = content
        self.post_id = post_id


We can define our database manager class:

In [None]:
import pymysql

class DatabaseManager:
    def __init__(self, host, user, password, db):
        self.connection = pymysql.connect(host=host,
                                          user=user,
                                          password=password,
                                          db=db,
                                          charset='utf8mb4',
                                          cursorclass=pymysql.cursors.DictCursor)

    def get_blog_by_id(self, id):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM `blogs` WHERE `id` = %s"
            cursor.execute(sql, (id,))
            result = cursor.fetchone()
            return Blog(result['id'], result['name'])

    def get_posts_by_blog_id(self, blog_id):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM `posts` WHERE `blog_id` = %s"
            cursor.execute(sql, (blog_id,))
            results = cursor.fetchall()
            return [Post(result['id'], result['title'], result['content'], result['blog_id']) for result in results]

    def get_comments_by_post_id(self, post_id):
        with self.connection.cursor() as cursor:
            sql = "SELECT * FROM `comments` WHERE `post_id` = %s"
            cursor.execute(sql, (post_id,))
            results = cursor.fetchall()
            return [Comment(result['id'], result['content'], result['post_id']) for result in results]

    def close(self):
        self.connection.close()

And then we can use it like this:

In [None]:
db = DatabaseManager('localhost', 'user', 'password', 'blog_db')
blog = db.get_blog_by_id(1)
posts = db.get_posts_by_blog_id(blog.id)

for post in posts:
    print(post.title)
    comments = db.get_comments_by_post_id(post.id)
    for comment in comments:
        print(comment.content)

db.close()