In [1]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker



In [31]:
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import func

# Define the connection string
DATABASE_URL = "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

# Create a new session
Session = sessionmaker(bind=engine)
session = Session()

# Use the updated declarative base
Base = declarative_base()

# Define the User class/table
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String)
    email = Column(String)

# Create the table if it doesn't exist
Base.metadata.create_all(engine)

# Insert a new user (only for the first run; avoid duplicating)
new_user = User(username="dhara", email="d@exmpl.com")
session.add(new_user)
session.commit()

print("Table created and user added!")

# Query all users
users = session.query(User).all()

# Count the number of users
user_count = session.query(func.count(User.id)).scalar()
print(f"Number of users: {user_count}")

# Query a user by id (check if user with id = 2 exists)
user = session.query(User).get(2)

if user:
    # Update the user's email
    user.email = "d2@example.com"
    user.username = "dj123"
    
    # Commit the changes
    session.commit()
    print("User email updated!")
else:
    print("User with id = 2 not found.")

# Close the session
session.close()






Table created and user added!
Number of users: 19
User email updated!


  user = session.query(User).get(2)


In [33]:
#Task 1& 2: Implementing Caching with SQLAlchemy Queries

import time
from sqlalchemy import Column, Integer, String, Float, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from functools import lru_cache

# Define the connection string
DATABASE_URL = "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

# Create a new session
Session = sessionmaker(bind=engine)
session = Session()

# Use the updated declarative base
Base = declarative_base()

# Define the Product class/table
class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    category = Column(String)
    price = Column(Float)

# Create the table if it doesn't exist (only for first run)
Base.metadata.create_all(engine)

# Insert sample data for testing (skip this if data already exists)
sample_products = [
    {'name': 'Laptop', 'category': 'Electronics', 'price': 1000},
    {'name': 'Smartphone', 'category': 'Electronics', 'price': 700},
    {'name': 'Table', 'category': 'Furniture', 'price': 150},
    {'name': 'Chair', 'category': 'Furniture', 'price': 85}
]

# Insert products into the database (if they don't already exist)
for p in sample_products:
    if not session.query(Product).filter_by(name=p['name']).first():
        session.add(Product(name=p['name'], category=p['category'], price=p['price']))
session.commit()

# Function to retrieve product details by id
@lru_cache(maxsize=128)
def get_product_by_id(product_id):
    """Retrieve product details by id from the database and cache the result."""
    product = session.query(Product).filter(Product.id == product_id).first()
    if product:
        return (product.id, product.name, product.category, product.price)
    return None

# Testing the caching functionality
def measure_time(product_id):
    """Measure the time taken to retrieve a product."""
    start_time = time.time()
    result = get_product_by_id(product_id)
    end_time = time.time()
    time_taken = end_time - start_time
    return result, time_taken

# Measure time for the first call (expected to be slower)
product_id = 1  # For 'Laptop'
product_details, time_taken_first = measure_time(product_id)
print(f"First call (Product ID = {product_id}): {product_details}")
print(f"Time taken for first call: {time_taken_first:.4f} seconds")

# Measure time for the second call (expected to be faster due to caching)
product_details, time_taken_second = measure_time(product_id)
print(f"Second call (Product ID = {product_id}): {product_details}")
print(f"Time taken for second call: {time_taken_second:.4f} seconds")

# Check cache statistics
print("CacheInfo:", get_product_by_id.cache_info())

# Close the session
session.close()


First call (Product ID = 1): (1, 'Laptop', 'Electronics', 1000.0)
Time taken for first call: 0.0055 seconds
Second call (Product ID = 1): (1, 'Laptop', 'Electronics', 1000.0)
Time taken for second call: 0.0000 seconds
CacheInfo: CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)


In [34]:
#Task 3: Handling Relationships in SQLAlchemy


from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

# Define the connection string
DATABASE_URL = "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

# Create a new session
Session = sessionmaker(bind=engine)
session = Session()

# Use the updated declarative base
Base = declarative_base()

# Define the Author class/table
class Author(Base):
    __tablename__ = "authors"
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    books = relationship("Book", back_populates="author")

# Define the Book class/table
class Book(Base):
    __tablename__ = "books"
    id = Column(Integer, primary_key=True)
    title = Column(String)
    author_id = Column(Integer, ForeignKey("authors.id"))
    author = relationship("Author", back_populates="books")

# Create the tables
Base.metadata.create_all(engine)

# Function to retrieve all books by a specific author
def get_books_by_author(author_name):
    author = session.query(Author).filter_by(name=author_name).first()
    if author:
        return [(book.title,) for book in author.books]
    return []

# Function to retrieve author details along with their books
def get_author_with_books(author_name):
    author = session.query(Author).filter_by(name=author_name).first()
    if author:
        books = [(book.title,) for book in author.books]
        return (author.name, books)
    return None

# Test the functions
print("Books by J.K. Rowling:", get_books_by_author('J.K. Rowling'))
print("George R.R. Martin with books:", get_author_with_books('George R.R. Martin'))

# Close the session
session.close()


Books by J.K. Rowling: []
George R.R. Martin with books: None


In [4]:
#Task 3: Handling Relationships in SQLAlchemy II

import time
from sqlalchemy import Column,Integer,String,ForeignKey,create_engine
from sqlalchemy.orm import declarative_base,sessionmaker,relationship

DATABASE_URL= "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

Session = sessionmaker(bind=engine)
session = Session()

Base = declarative_base()

class Author(Base):
    __tablename__ = "authors"
    id = Column(Integer,primary_key=True)
    name = Column(String,unique=True)
    books = relationship("Book", back_populates="author")

class Book(Base):
    __tablename__="books"
    id = Column(Integer,primary_key=True)
    title = Column(String)
    author_id = Column(Integer,ForeignKey("authors.id"))
    author = relationship("Author",back_populates="books")

Base.metadata.create_all(engine)

authors_data = [{ 'name':'jk.row'},{'name':'dhruvshah'}]
books_data = [
    { 'title': 'harry potter','author_id':1},
    {'title': 'A Game of Thrones', 'author_id': 2},
    {'title': 'A Clash of Kings', 'author_id': 2}
]

for author_data in authors_data:
    if not session.query(Author).filter_by(name=author_data['name']).first():
        session.add(Author(name=author_data['name']))
session.commit()

for book_data in books_data:
    if not session.query(Book).filter_by(title=book_data['title']).first():
        session.add(Book(title=book_data['title'], author_id=book_data['author_id']))
session.commit()

def get_author_with_books(author_name):
    author = session.query(Author).filter(Author.name == author_name).first()
    if  author: 
        books = [(book.title,) for book in author.books]
        return (author.name,books)
    return None

def test_author_queries():
    #query1
    print("\nquery1: retrieve all books by 'jk.row'")
    books_jk_rowling = get_author_with_books('jk.row')
    print(f"books by jk.row':{books_jk_rowling}")

    #query2
    print("\nquery2: retrives author details along with their books for 'dhruvshah'")

    start_time = time.time()
    author_details = get_author_with_books('dhruvshah')
    end_time = time.time()
    print(f"Author details: {author_details}")
    print(f"Time taken: {end_time-start_time:.4f} seconds")

test_author_queries()
session.close()






query1: retrieve all books by 'jk.row'
books by jk.row':('jk.row', [('harry potter',)])

query2: retrives author details along with their books for 'dhruvshah'
Author details: ('dhruvshah', [('A Game of Thrones',), ('A Clash of Kings',)])
Time taken: 0.0018 seconds


In [6]:
from sqlalchemy import Column, Integer, String, Float, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import func, desc
from functools import lru_cache

# Define the connection string
DATABASE_URL = "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

# Create a new session
Session = sessionmaker(bind=engine)
session = Session()

Base = declarative_base()

# Define the Product class/table
class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    category = Column(String)
    price = Column(Float)

# Create the table
Base.metadata.create_all(engine)

# Insert sample products data
products_data = [
    {'name': 'Laptop', 'category': 'Electronics', 'price': 1000},
    {'name': 'Smartphone', 'category': 'Electronics', 'price': 700},
    {'name': 'Table', 'category': 'Furniture', 'price': 150},
    {'name': 'Chair', 'category': 'Furniture', 'price': 85},
    {'name': 'Desk', 'category': 'Furniture', 'price': 200},
    {'name': 'Monitor', 'category': 'Electronics', 'price': 300},
    {'name': 'Headphones', 'category': 'Electronics', 'price': 150},
    {'name': 'Lamp', 'category': 'Furniture', 'price': 50},
]

for product_data in products_data:
    if not session.query(Product).filter_by(name=product_data['name']).first():
        session.add(Product(**product_data))
session.commit()

@lru_cache(maxsize=128)
def get_top3():
    categories = session.query(Product.category).distinct()
    result = []

    for category in categories:
        top_product = (
            session.query(Product.name,Product.price)
            .filter_by(category=category[0])
            .order_by(desc(Product.price))
            .limit(3)
            .all()
        )

        result.append((category[0],top_product))

    return result

def test():
    print("/query: retrive top 3 most expensive products in each catagory")
    top_products = get_top3()
    print(f"top 3 most expensive products by category: {top_products}")

    print("\nRunning query again to test cache performance..")
    top_products = get_top3()
    print(f"Cahched  query result: {top_products}")

test()
session.close()


/query: retrive top 3 most expensive products in each catagory
top 3 most expensive products by category: [('Furniture', [('Desk', 200.0), ('Table', 150.0), ('Chair', 85.0)]), ('Electronics', [('Laptop', 1000.0), ('Smartphone', 700.0), ('Monitor', 300.0)])]

Running query again to test cache performance..
Cahched  query result: [('Furniture', [('Desk', 200.0), ('Table', 150.0), ('Chair', 85.0)]), ('Electronics', [('Laptop', 1000.0), ('Smartphone', 700.0), ('Monitor', 300.0)])]


In [9]:
from sqlalchemy import Column, Integer, String, Boolean, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from functools import lru_cache

# Define the connection string
DATABASE_URL = "postgresql://postgres:Technman2024@localhost:5432/MyDb"
engine = create_engine(DATABASE_URL)

# Create a new session
Session = sessionmaker(bind=engine)
session = Session()

# Define the declarative base
Base = declarative_base()

# Define the User class/table with a soft delete column (is_deleted)
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String)
    email = Column(String)
    is_deleted = Column(Boolean, default=False)  # Soft delete flag

# Recreate the table (this will drop and recreate the table if it already exists)
Base.metadata.drop_all(engine)  # Drop existing tables (optional if you want a clean start)
Base.metadata.create_all(engine)  # Create tables with the updated structure

# Insert some sample users
users_data = [
    {'username': 'Alice', 'email': 'alice@example.com'},
    {'username': 'Bob', 'email': 'bob@example.com'},
]

# Insert data into the users table
for user_data in users_data:
    if not session.query(User).filter_by(username=user_data['username']).first():
        session.add(User(**user_data))
session.commit()

# Function to mark a user as soft-deleted
def soft_delete_user(user_id):
    user = session.query(User).filter_by(id=user_id).first()
    if user:
        user.is_deleted = True
        session.commit()

# Caching function to get a user by id, excluding soft-deleted users
@lru_cache(maxsize=128)
def get_user_by_id(user_id):
    user = session.query(User).filter_by(id=user_id, is_deleted=False).first()
    if user:
        return (user.id, user.username, user.email)
    return None

# Test function to demonstrate soft delete and caching
def test_soft_delete():
    # Retrieve user with id 1 (should return Alice)
    print("\nQuery: Retrieve user by id 1")
    user_1 = get_user_by_id(1)
    print(f"User 1: {user_1}")

    # Soft delete user with id 2 (Bob)
    print("\nMarking user with id 2 as deleted...")
    soft_delete_user(2)

    # Retrieve user with id 2 (should return None after soft delete)
    print("\nQuery: Retrieve user by id 2 (after soft delete)")
    user_2 = get_user_by_id(2)
    print(f"User 2: {user_2}")

    # Check cache statistics
    print(f"\nCacheInfo: {get_user_by_id.cache_info()}")

# Run the test
test_soft_delete()

# Close the session
session.close()



Query: Retrieve user by id 1
User 1: (1, 'Alice', 'alice@example.com')

Marking user with id 2 as deleted...

Query: Retrieve user by id 2 (after soft delete)
User 2: None

CacheInfo: CacheInfo(hits=0, misses=2, maxsize=128, currsize=2)
