# Connecting PostgreSQL with Python

In [1]:
import psycopg2

DB_NAME = "my_ecommerce_db"
USER = "postgres"
PASSWORD = "new_password"
HOST = "localhost"
PORT = "5432"

try:
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=USER,
        password=PASSWORD,
        host=HOST,
        port=PORT
    )
    print("Connected to PostgreSQL successfully!")

    cursor = conn.cursor()

    cursor.execute("SELECT version();")
    db_version = cursor.fetchone()
    print(f"PostgreSQL Version: {db_version}")

    cursor.close()
    conn.close()
    print("PostgreSQL connection closed.")

except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")


Connected to PostgreSQL successfully!
PostgreSQL Version: ('PostgreSQL 17.2 on x86_64-windows, compiled by msvc-19.42.34435, 64-bit',)
PostgreSQL connection closed.


In [11]:
import psycopg2

conn = psycopg2.connect(
    dbname="my_ecommerce_db",
    user="postgres",
    password="new_password",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

# Create a new table (if not exists)
create_table_query = '''
CREATE TABLE IF NOT EXISTS customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL
);
'''
cursor.execute(create_table_query)
conn.commit()
print("Table created successfully!")

cursor.close()
conn.close()


Table created successfully!


In [12]:
import psycopg2

conn = psycopg2.connect(
    dbname="my_ecommerce_db",
    user="postgres",
    password="new_password",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

# Insert data into customers table
insert_query = "INSERT INTO customers (name, email) VALUES (%s, %s)"
data = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
cursor.executemany(insert_query, data)

conn.commit()
print("Data inserted successfully!")

cursor.close()
conn.close()


Data inserted successfully!


In [13]:
import psycopg2

conn = psycopg2.connect(
    dbname="my_ecommerce_db",
    user="postgres",
    password="new_password",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

cursor.execute("SELECT * FROM customers;")
rows = cursor.fetchall()

for row in rows:
    print(row)  

cursor.close()
conn.close()


(1, 'Alice', 'alice@example.com')
(2, 'Bob', 'bob@example.com')


In [14]:
import psycopg2

conn = psycopg2.connect(
    dbname="my_ecommerce_db",
    user="postgres",
    password="new_password",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

update_query = "UPDATE customers SET email = %s WHERE name = %s"
cursor.execute(update_query, ("alice_new@example.com", "Alice"))

conn.commit()
print("Data updated successfully!")

cursor.close()
conn.close()


Data updated successfully!


In [15]:
import psycopg2

conn = psycopg2.connect(
    dbname="my_ecommerce_db",
    user="postgres",
    password="new_password",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

delete_query = "DELETE FROM customers WHERE name = %s"
cursor.execute(delete_query, ("Bob",))

conn.commit()
print("Data deleted successfully!")

cursor.close()
conn.close()


Data deleted successfully!


In [16]:
import asyncpg
import asyncio

async def connect_and_fetch():
    conn = await asyncpg.connect(
	    dbname="my_ecommerce_db",
	    user="postgres",
	    password="new_password",
	    host="localhost",
	    port="5432"
    )

    rows = await conn.fetch("SELECT * FROM customers;")
    for row in rows:
        print(dict(row))

    await conn.close()

asyncio.run(connect_and_fetch())


RuntimeError: asyncio.run() cannot be called from a running event loop

In [17]:
# because of jupyter notebook

In [20]:
import asyncpg
import asyncio

async def connect_and_fetch():
    conn = await asyncpg.connect(
        database="my_ecommerce_db", 
        user="postgres",
        password="new_password",
        host="localhost",
        port="5432"
    )

    rows = await conn.fetch("SELECT * FROM customers;")
    for row in rows:
        print(dict(row))

    await conn.close()

# In Jupyter Notebook, use await instead of asyncio.run()
await connect_and_fetch()


{'id': 1, 'name': 'Alice', 'email': 'alice_new@example.com'}


In [21]:
import psycopg2
from psycopg2 import pool

try:
    pg_pool = pool.SimpleConnectionPool(
        1, 10,  # Min and max connections
        dbname="my_ecommerce_db",
		    user="postgres",
		    password="new_password",
		    host="localhost",
		    port="5432"
    )

    if pg_pool:
        print("Connection pool created successfully")

    conn = pg_pool.getconn()
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM customers;")
    rows = cursor.fetchall()

    for row in rows:
        print(row)

    cursor.close()
    pg_pool.putconn(conn)

except Exception as e:
    print(f"Error: {e}")

finally:
    if pg_pool:
        pg_pool.closeall()
        print("Connection pool closed.")


Connection pool created successfully
(1, 'Alice', 'alice_new@example.com')
Connection pool closed.


# Connecting MongoDB with python

In [22]:
import pymongo

# Establish Connection (Local MongoDB)
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Select the Database
db = client["shopDB"]

# Select Collections
customers_collection = db["customers"]
orders_collection = db["orders"]

print("Connected to MongoDB successfully!")


Connected to MongoDB successfully!


In [24]:
customers = [
    {"_id": 6, "name": "Ram", "email": "ram@example.com", "address": "123 Kathmandu, City"},
    {"_id": 7, "name": "Shyam", "email": "shyam@example.com", "address": "456 Lalitpur, City"}
]

# Insert multiple customers
customers_collection.insert_many(customers)
print("Customers inserted successfully!")


Customers inserted successfully!


In [28]:
customers = customers_collection.find()
print("Customers List:")
for customer in customers:
    print(customer)


Customers List:
{'_id': 1, 'name': 'Alice', 'age': 29, 'purchases': 3, 'location': 'NYC'}
{'_id': 2, 'name': 'Bob', 'age': 35, 'purchases': 5, 'purchase': 2, 'location': 'Los Angeles'}
{'_id': 3, 'name': 'Charlie Brown', 'age': 23, 'city': 'Seattle', 'purchases': 3}
{'_id': 4, 'name': 'David', 'age': 40, 'purchases': 7, 'location': 'NYC'}
{'_id': 5, 'name': 'Emma', 'age': 30, 'purchases': 4, 'location': 'San Francisco'}
{'_id': 6, 'name': 'Ram', 'email': 'ram@example.com', 'address': '123 Kathmandu, City'}
{'_id': 7, 'name': 'Shyam', 'email': 'shyam@example.com', 'address': '456 Lalitpur, City'}


In [29]:
customer_id = 1
customer_orders = orders_collection.find({"customer_id": customer_id})

print(f"Orders placed by Customer ID {customer_id}:")
for order in customer_orders:
    print(order)


Orders placed by Customer ID 1:
{'_id': 101, 'customer_id': 1, 'product': 'Laptop', 'price': 1200, 'quantity': 1}


In [30]:
# Fetch all orders and their associated customer details
orders_with_customers = orders_collection.aggregate([
    {
        "$lookup": {
            "from": "customers",       # Join with customers collection
            "localField": "customer_id",  # Field in orders
            "foreignField": "_id",    # Matching field in customers
            "as": "customer_info"     # Store result in this field
        }
    }
])

print("Orders with Customer Details:")
for order in orders_with_customers:
    print(order)


Orders with Customer Details:
{'_id': 101, 'customer_id': 1, 'product': 'Laptop', 'price': 1200, 'quantity': 1, 'customer_info': [{'_id': 1, 'name': 'Alice', 'age': 29, 'purchases': 3, 'location': 'NYC'}]}
{'_id': 102, 'customer_id': 2, 'product': 'Phone', 'price': 800, 'quantity': 2, 'customer_info': [{'_id': 2, 'name': 'Bob', 'age': 35, 'purchases': 5, 'purchase': 2, 'location': 'Los Angeles'}]}
{'_id': 103, 'customer_id': 3, 'product': 'Tablet', 'price': 600, 'quantity': 1, 'customer_info': [{'_id': 3, 'name': 'Charlie Brown', 'age': 23, 'city': 'Seattle', 'purchases': 3}]}
{'_id': 104, 'customer_id': 4, 'product': 'Headphones', 'price': 150, 'quantity': 3, 'customer_info': [{'_id': 4, 'name': 'David', 'age': 40, 'purchases': 7, 'location': 'NYC'}]}
{'_id': 105, 'customer_id': 5, 'product': 'Monitor', 'price': 300, 'quantity': 2, 'customer_info': [{'_id': 5, 'name': 'Emma', 'age': 30, 'purchases': 4, 'location': 'San Francisco'}]}


In [31]:
query = {"_id": 1}  # Find Alice
new_values = {"$set": {"email": "alice.new@example.com"}}

customers_collection.update_one(query, new_values)
print("Updated customer email.")


Updated customer email.


In [32]:
delete_query = {"_id": 7}  
customers_collection.delete_one(delete_query)
print("Deleted customer.")


Deleted customer.


# ORM with PostgreSQL 

In [36]:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql://postgres:new_password@localhost:5432/mydatabase"

engine = create_engine(DATABASE_URL)

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String, nullable = False)
    email = Column(String, unique=True, nullable = False)

# Create tables
Base.metadata.create_all(engine)

# Create session
SessionLocal = sessionmaker(bind = engine)
session = SessionLocal()

In [37]:
new_user = User(name="John Doe", email="john@example.com")
session.add(new_user)
session.commit()


In [40]:
users = session.query(User).all()
for user in users:
    print(user.id, user.name, user.email)


1 John Doe john@example.com


In [41]:
user = session.query(User).filter_by(name="John Doe").first()
session.delete(user)
session.commit()


In [44]:
users = session.query(User).all()
if not users:
    print("no users")
else:
    print("Users exist:", [user.name for user in users])

no users


# ORM with MongoDB

In [45]:
import mongoengine as me

# Connect to MongoDB
me.connect('mydatabase', host='localhost', port=27017)

# Define a model
class User(me.Document):
    name = me.StringField(required=True)
    email = me.StringField(required=True, unique=True)


In [46]:
user = User(name="Alice", email="alice@example.com")
user.save()


<User: User object>

In [49]:
Base.metadata.create_all(engine)  


In [50]:
users = User.objects()
for user in users:
    print(user.name, user.email)


Alice alice@example.com


In [51]:
user = User.objects(name="Alice").first()
user.name = "Alice Updated"
user.save()


<User: User object>

In [53]:
user = User.objects(name="Alice Updated").first()
user.delete()


In [54]:
users = User.objects()
for user in users:
    print(user.name, user.email)