In [10]:
from sqlalchemy import create_engine, Column, Integer, String, Float, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DATABASE_URL = 'postgresql://postgres:sanujT\
@localhost/sqlalchemy'

engine = create_engine(DATABASE_URL, echo=False)

Session = sessionmaker(bind=engine)
session = Session()

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    category = Column(String)
    price = Column(Float)

# Create the products table
Base.metadata.create_all(engine)

products_data = [
    {'name': 'Laptop', 'category': 'Electronics', 'price': 1000},
    {'name': 'Smartphone', 'category': 'Electronics', 'price': 700},
    {'name': 'Table', 'category': 'Furniture', 'price': 150},
    {'name': 'Chair', 'category': 'Furniture', 'price': 85}
]

for product in products_data:
    new_product = Product(name=product['name'], category=product['category'], price=product['price'])
    session.add(new_product)

session.commit()
# session.close()

def get_products_by_category(category):
    return session.query(Product).filter(Product.category == category).all()

def get_products_above_price(threshold):
    return session.query(Product).filter(Product.price > threshold).all()

def get_average_price_per_category():
    return session.query(Product.category, func.avg(Product.price)).group_by(Product.category).all()


# Test Query 1: Retrieve all products in 'Electronics' category
electronics_products = get_products_by_category('Electronics')
print("Electronics Products:", [(p.name, p.category, p.price) for p in electronics_products])

# Test Query 2: Retrieve products with a price above 200
expensive_products = get_products_above_price(500)
print("Products above price 200:", [(p.name, p.category, p.price) for p in expensive_products])

# Test Query 3: Retrieve the average price of products in each category
average_price_per_category = get_average_price_per_category()
print("Average Price per Category:", average_price_per_category)


Electronics Products: [('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0), ('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0), ('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0)]
Products above price 200: [('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0), ('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0), ('Laptop', 'Electronics', 1000.0), ('Smartphone', 'Electronics', 700.0)]
Average Price per Category: [('Furniture', 117.5), ('Electronics', 850.0)]


  Base = declarative_base()


In [12]:
from functools import lru_cache
import time

@lru_cache(maxsize=128)
def get_product_by_id(product_id):
    """Retrieve product details by id with caching."""
    product = session.query(Product).filter(Product.id == product_id).one_or_none()
    return (product.id, product.name, product.category, product.price) if product else None


if __name__ == "__main__":
    start_time = time.time()
    product_details = get_product_by_id(1)
    print("First Call Output:", product_details)
    print("Time taken for first call:", round(time.time() - start_time, 8), "seconds")

    start_time = time.time()
    product_details = get_product_by_id(1)
    print("Second Call Output:", product_details)
    print("Time taken for second call:", round(time.time() - start_time, 8), "seconds")

    # Check cache statistics
    print("CacheInfo:", get_product_by_id.cache_info())

First Call Output: (1, 'Laptop', 'Electronics', 1000.0)
Time taken for first call: 0.00234652 seconds
Second Call Output: (1, 'Laptop', 'Electronics', 1000.0)
Time taken for second call: 0.0 seconds
CacheInfo: CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)


In [15]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from functools import lru_cache
import time

DATABASE_URL = 'postgresql://postgres:sanujT@localhost/sqlalchemy'  # Update with your credentials

engine = create_engine(DATABASE_URL, echo=False)

Base = declarative_base()

class Author(Base):
    __tablename__ = 'authors'
    
    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    
    books = relationship('Book', back_populates='author')

class Book(Base):
    __tablename__ = 'books'
    
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    author_id = Column(Integer, ForeignKey('authors.id'))
    
    author = relationship('Author', back_populates='books')

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

def insert_sample_data():
    authors_data = [{'name': 'J.K. Rowling'}, {'name': 'George R.R. Martin'}]
    books_data = [
        {'title': 'Harry Potter and the Philosopher\'s Stone', 'author_id': 1},
        {'title': 'Harry Potter and the Chamber of Secrets', 'author_id': 1},
        {'title': 'A Game of Thrones', 'author_id': 2},
        {'title': 'A Clash of Kings', 'author_id': 2}
    ]
    
    for author in authors_data:
        new_author = Author(name=author['name'])
        session.add(new_author)
    
    session.commit()  
    
    for book in books_data:
        new_book = Book(title=book['title'], author_id=book['author_id'])
        session.add(new_book)
    
    session.commit()


def get_books_by_author(author_name):
    """Retrieve all books by a specific author."""
    return session.query(Book).join(Author).filter(Author.name == author_name).all()

@lru_cache(maxsize=128)
def get_author_with_books(author_name):
    """Retrieve author details along with their books, cached."""
    author = session.query(Author).filter(Author.name == author_name).one_or_none()
    if author:
        return author.name, [(book.title,) for book in author.books]
    return None


if __name__ == "__main__":
    books_rowling = get_books_by_author('J.K. Rowling')
    print("Books by J.K. Rowling:", [(book.title,) for book in books_rowling])

    start_time = time.time()
    author_details_rowling = get_author_with_books('J.K. Rowling')
    print("Author with Books (first call):", author_details_rowling)
    print("Time taken for first call:", round(time.time() - start_time, 8), "seconds")

    start_time = time.time()
    author_details_rowling_cached = get_author_with_books('J.K. Rowling')
    print("Author with Books (second call):", author_details_rowling_cached)
    print("Time taken for second call:", round(time.time() - start_time, 8), "seconds")

    print("CacheInfo:", get_author_with_books.cache_info())


Books by J.K. Rowling: []
Author with Books (first call): None
Time taken for first call: 0.00199866 seconds
Author with Books (second call): None
Time taken for second call: 0.0 seconds
CacheInfo: CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)


  Base = declarative_base()


In [16]:
from sqlalchemy import create_engine, Column, Integer, String, Float, desc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from functools import lru_cache

DATABASE_URL = 'postgresql://postgres:sanujT@localhost/sqlalchemy' 

engine = create_engine(DATABASE_URL, echo=False)

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    category = Column(String)
    price = Column(Float)

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

def insert_sample_data():
    products_data = [
        {'name': 'Laptop', 'category': 'Electronics', 'price': 1000},
        {'name': 'Smartphone', 'category': 'Electronics', 'price': 700},
        {'name': 'Table', 'category': 'Furniture', 'price': 150},
        {'name': 'Chair', 'category': 'Furniture', 'price': 85},
        {'name': 'Desk', 'category': 'Furniture', 'price': 200},
        {'name': 'Monitor', 'category': 'Electronics', 'price': 300},
        {'name': 'Headphones', 'category': 'Electronics', 'price': 150},
        {'name': 'Lamp', 'category': 'Furniture', 'price': 50}
    ]
    
    for product in products_data:
        new_product = Product(name=product['name'], category=product['category'], price=product['price'])
        session.add(new_product)
    
    session.commit()


# Step 3: Query Function for Top 3 Most Expensive Products in Each Category
@lru_cache(maxsize=128)
def get_top_3_products_by_category():
    """Retrieve the top 3 most expensive products in each category."""
    categories = session.query(Product.category).distinct().all()
    results = []

    for category in categories:
        top_products = session.query(Product).filter(Product.category == category[0]).order_by(desc(Product.price)).limit(3).all()
        results.append((category[0], [(product.name, product.price) for product in top_products]))

    return results


if __name__ == "__main__":
    # Retrieve top 3 most expensive products in each category
    top_products = get_top_3_products_by_category()
    print("Top 3 Most Expensive Products in Each Category:")
    for category, products in top_products:
        print(f"{category}: {products}")

    # Check cache statistics
    print("CacheInfo:", get_top_3_products_by_category.cache_info())


Top 3 Most Expensive Products in Each Category:
Furniture: [('Table', 150.0), ('Table', 150.0), ('Table', 150.0)]
Electronics: [('Laptop', 1000.0), ('Laptop', 1000.0), ('Laptop', 1000.0)]
CacheInfo: CacheInfo(hits=0, misses=1, maxsize=128, currsize=1)


  Base = declarative_base()


In [17]:
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from functools import lru_cache

DATABASE_URL = 'postgresql://postgres:sanujT@localhost/sqlalchemy'

engine = create_engine(DATABASE_URL, echo=False)

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    age = Column(Integer, nullable=False)
    is_deleted = Column(Boolean, default=False)

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

def insert_sample_data():
    users_data = [
        {'name': 'Alice', 'age': 31},
        {'name': 'Bob', 'age': 25},
        {'name': 'Charlie', 'age': 30}
    ]
    
    for user in users_data:
        new_user = User(name=user['name'], age=user['age'])
        session.add(new_user)
    
    session.commit()


@lru_cache(maxsize=128)
def get_user_by_id(user_id):
    """Retrieve a user by id, excluding soft-deleted users."""
    user = session.query(User).filter(User.id == user_id, User.is_deleted == False).one_or_none()
    return (user.id, user.name, user.age) if user else None

def soft_delete_user(user_id):
    """Mark a user as deleted (soft delete)."""
    user = session.query(User).filter(User.id == user_id).one_or_none()
    if user:
        user.is_deleted = True
        session.commit()


if __name__ == "__main__":
    soft_delete_user(2)

    user_bob = get_user_by_id(2)
    print("User with id 2:", user_bob)

    user_alice = get_user_by_id(1)
    print("User with id 1:", user_alice)

    print("CacheInfo:", get_user_by_id.cache_info())


User with id 2: None
User with id 1: None
CacheInfo: CacheInfo(hits=0, misses=2, maxsize=128, currsize=2)


  Base = declarative_base()
