In [1]:
from sqlalchemy import create_engine

connection_string = "sqlite:///userdata.db"
engine = create_engine(connection_string, echo=True)

In [2]:
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

In [3]:
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import mapped_column

class Userdata(Base):
    __tablename__ = "userdata"
    user_id = mapped_column(String(32), primary_key=True, nullable=False)
    user_data = mapped_column(String(640))
    time_stamp = mapped_column(DateTime(), default=func.now(), onupdate=func.now())

In [None]:
print(Base.metadata.tables)

In [4]:
Base.metadata.create_all(engine)

2023-10-19 22:44:17,658 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-19 22:44:17,662 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("userdata")
2023-10-19 22:44:17,663 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-19 22:44:17,665 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("userdata")
2023-10-19 22:44:17,667 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-19 22:44:17,668 INFO sqlalchemy.engine.Engine 
CREATE TABLE userdata (
	user_id VARCHAR(32) NOT NULL, 
	user_data VARCHAR(640), 
	time_stamp DATETIME, 
	PRIMARY KEY (user_id)
)


2023-10-19 22:44:17,669 INFO sqlalchemy.engine.Engine [no key 0.00107s] ()
2023-10-19 22:44:17,677 INFO sqlalchemy.engine.Engine COMMIT


In [6]:
from sqlalchemy.orm import Session

session = Session(engine)

In [7]:
from datetime import datetime
user = Userdata(user_id="Brad", user_data="Brad's data")
session.add(user)

In [8]:
user = Userdata(user_id="Larry", user_data="Data for Larry")
session.add(user)
user = Userdata(user_id="Jane", user_data="More data")
session.add(user)

In [9]:
from sqlalchemy import insert

add_list = [{"user_id": "Barbra", "user_data": "Data for Barbra"},
            {"user_id": "Jessa", "user_data": "Jessas data"},
            {"user_id": "Ed", "user_data": "Ed has lots of data"}]

session.execute(insert(Userdata), add_list)

2023-10-19 22:46:37,734 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-19 22:46:37,738 INFO sqlalchemy.engine.Engine INSERT INTO userdata (user_id, user_data, time_stamp) VALUES (?, ?, CURRENT_TIMESTAMP), (?, ?, CURRENT_TIMESTAMP), (?, ?, CURRENT_TIMESTAMP) RETURNING time_stamp, user_id
2023-10-19 22:46:37,739 INFO sqlalchemy.engine.Engine [generated in 0.00027s (insertmanyvalues) 1/1 (ordered)] ('Brad', "Brad's data", 'Larry', 'Data for Larry', 'Jane', 'More data')
2023-10-19 22:46:37,747 INFO sqlalchemy.engine.Engine INSERT INTO userdata (user_id, user_data, time_stamp) VALUES (?, ?, CURRENT_TIMESTAMP)
2023-10-19 22:46:37,749 INFO sqlalchemy.engine.Engine [generated in 0.00201s] [('Barbra', 'Data for Barbra'), ('Jessa', 'Jessas data'), ('Ed', 'Ed has lots of data')]


<sqlalchemy.engine.result.IteratorResult at 0x18d328f6650>

In [10]:
from sqlalchemy import select

stmt = select(Userdata)
results = session.execute(stmt).scalars()
for x in results:
    print(x.user_id, x.user_data, x.time_stamp)

2023-10-19 22:46:41,133 INFO sqlalchemy.engine.Engine SELECT userdata.user_id, userdata.user_data, userdata.time_stamp 
FROM userdata
2023-10-19 22:46:41,134 INFO sqlalchemy.engine.Engine [generated in 0.00106s] ()
Brad Brad's data 2023-10-20 02:46:37
Larry Data for Larry 2023-10-20 02:46:37
Jane More data 2023-10-20 02:46:37
Barbra Data for Barbra 2023-10-20 02:46:37
Jessa Jessas data 2023-10-20 02:46:37
Ed Ed has lots of data 2023-10-20 02:46:37


In [11]:
session.commit()

2023-10-19 22:46:42,762 INFO sqlalchemy.engine.Engine COMMIT


In [12]:
session.close()

In [13]:
id = "Brad"
brad = session.execute(
    select(Userdata).where(Userdata.user_id == id)
    ).scalar()
if brad:
    brad.user_data = "Changed data"
    session.commit()
else:
    print(f"User '{id}' does not exist")
session.close()

2023-10-19 22:47:21,385 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-19 22:47:21,387 INFO sqlalchemy.engine.Engine SELECT userdata.user_id, userdata.user_data, userdata.time_stamp 
FROM userdata 
WHERE userdata.user_id = ?
2023-10-19 22:47:21,388 INFO sqlalchemy.engine.Engine [generated in 0.00066s] ('Brad',)
2023-10-19 22:47:21,391 INFO sqlalchemy.engine.Engine UPDATE userdata SET user_data=?, time_stamp=CURRENT_TIMESTAMP WHERE userdata.user_id = ?
2023-10-19 22:47:21,392 INFO sqlalchemy.engine.Engine [generated in 0.00089s] ('Changed data', 'Brad')
2023-10-19 22:47:21,394 INFO sqlalchemy.engine.Engine COMMIT


In [15]:
from sqlalchemy import delete

session.execute(delete(Userdata).where(Userdata.user_id == "Larry"))
session.commit()
session.close()

2023-10-19 23:04:03,055 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-19 23:04:03,058 INFO sqlalchemy.engine.Engine DELETE FROM userdata WHERE userdata.user_id = ?
2023-10-19 23:04:03,059 INFO sqlalchemy.engine.Engine [generated in 0.00113s] ('Larry',)
2023-10-19 23:04:03,065 INFO sqlalchemy.engine.Engine COMMIT


In [16]:
stmt = select(Userdata)
results = session.execute(stmt).scalars()
for x in results:
    print(x)

2023-10-19 23:05:04,223 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-19 23:05:04,224 INFO sqlalchemy.engine.Engine SELECT userdata.user_id, userdata.user_data, userdata.time_stamp 
FROM userdata
2023-10-19 23:05:04,225 INFO sqlalchemy.engine.Engine [cached since 1103s ago] ()
<__main__.Userdata object at 0x0000018D31BF2650>
<__main__.Userdata object at 0x0000018D32980790>
<__main__.Userdata object at 0x0000018D32982CD0>
<__main__.Userdata object at 0x0000018D32983350>
<__main__.Userdata object at 0x0000018D32983390>


In [18]:
class Userdata(Base):
    __tablename__ = "userdata"
    __table_args__ = {'extend_existing': True}
    user_id = mapped_column(String(32), primary_key=True, nullable=False)
    user_data = mapped_column(String(640))
    time_stamp = mapped_column(DateTime(), default=func.now(), onupdate=func.now())

    def __repr__(self):
        return f"user_id={self.user_id}, " \
               f"user_data={self.user_data}, " \
               f"time_stamp={self.time_stamp.strftime('%B %d %H:%M')}"

  class Userdata(Base):


In [19]:
stmt = select(Userdata)
results = session.execute(stmt).scalars()
for x in results:
    print(x)

2023-10-19 23:08:23,680 INFO sqlalchemy.engine.Engine SELECT userdata.user_id, userdata.user_data, userdata.time_stamp 
FROM userdata
2023-10-19 23:08:23,682 INFO sqlalchemy.engine.Engine [generated in 0.00137s] ()
user_id=Brad, user_data=Changed data, time_stamp=October 20 02:47
user_id=Jane, user_data=More data, time_stamp=October 20 02:46
user_id=Barbra, user_data=Data for Barbra, time_stamp=October 20 02:46
user_id=Jessa, user_data=Jessas data, time_stamp=October 20 02:46
user_id=Ed, user_data=Ed has lots of data, time_stamp=October 20 02:46


# Stop here

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

engine = create_engine("sqlite:///userdata.db", echo=True)

In [None]:
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

In [None]:
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import mapped_column

class Userdata(Base):
    __tablename__ = "userdata"
    user_id = mapped_column(String(32), primary_key=True, nullable=False)
    user_data = mapped_column(String(640))
    time_stamp = mapped_column(DateTime(timezone=True), default=func.now(), onupdate=func.now())

    def __repr__(self):
        return f"user_id={self.user_id}, " \
               f"user_data={self.user_data}, " \
               f"time_stamp={self.time_stamp.strftime('%B %d %H:%M')}"

In [None]:
from sqlalchemy import String, DateTime
from sqlalchemy.orm import mapped_column

class Userdata(Base):
    __tablename__ = "userdata"
    user_id = mapped_column(String(32), primary_key=True, nullable=False)
    user_data = mapped_column(String(640))
    time_stamp = mapped_column(DateTime(timezone=True))

In [None]:
print(DeclarativeBase.metadata.tables)

In [None]:
print(Userdata)

Database is created after the step below. See userdata.db file in folder

In [None]:
Base.metadata.create_all(engine)

In section below, need to manually commit and close session

In [None]:
from datetime import datetime

session = Session(engine)
user = Userdata(user_id="Brad", user_data="Data from brad")
session.add(user)
user = Userdata(user_id="Larry", user_data="More data")
session.add(user)
user = Userdata(user_id="Jane", user_data="Even more data")
session.add(user)
session.commit()
session.close()

In [None]:
from sqlalchemy import update

def db_update(session, id, data):
    stmt = (update(Userdata)
            .where(Userdata.user_id == id)
            .values(user_data=data))
    session.execute(stmt)

In [None]:
db_update(session, "Brad", "New New New Bradley")
user = Userdata(user_id="Sally", user_data="Sally data")
session.add(user)
session.commit()

In [None]:
from datetime import datetime

session = Session(engine)
user1 = Userdata(user_id="x", user_data="Data from brad")
user2 = Userdata(user_id="y", user_data="More data")
user3 = Userdata(user_id="z", user_data="Even more data")
session.add_all([user1, user2, user3])
session.commit()
session.close()

if you just use Session class, conext manager does not automatically write data to DB at end of block, or rollback if error. need to call *session.commit()* before end of block


In [None]:
with Session(engine) as session:
    user = Userdata(user_id="Allan", user_data="Data from Allan", time_stamp=datetime.now())
    session.add(user)
    user = Userdata(user_id="Chad", user_data="More Chad data", time_stamp=datetime.now())
    session.add(user)
    user = Userdata(user_id="Mary", user_data="Even more Mary data", time_stamp=datetime.now())
    session.add(user)
    session.commit()


In the section below, the sessiomaker's *begin* method creates a context manager that will run purge() and commit() at the end of the block so they do not need to be explixitly called using *session.commit()*. Also takes care of rollback if write fails. See https://docs.sqlalchemy.org/en/14/orm/session_basics.html#getting-a-session for more ways to do this

In [None]:
from sqlalchemy.orm import sessionmaker

Session2 = sessionmaker(engine)

with Session2.begin() as session:
    user1 = Userdata(user_id="xx", user_data="Data from Phil", time_stamp=datetime.now())
    user2 = Userdata(user_id="yy", user_data="More Karen data", time_stamp=datetime.now())
    user3 = Userdata(user_id="ss", user_data="Gary data", time_stamp=datetime.now())
    session.add_all([user1, user2, user3])

experiment with flush

In [None]:
session3.close()
from sqlalchemy import select

session3 = Session(engine)
session3.autoflush=False

user = Userdata(user_id="Flusher6", user_data="Flushed6")
session3.add(user)


In [None]:
session3.autoflush=True
f = session3.get(Userdata, "Flusher6")

In [None]:
print(f)
print(f.user_id, f.user_data, f.time_stamp)

In [None]:
stmt = select(Userdata)
results = session3.execute(stmt).fetchall()
print(results)
for x in results:
    print(type(x))
    # print(x.user_id, x.user_data, x.time_stamp)

In [None]:
stmt = select(Userdata)
results = session3.execute(stmt).scalars()
for x in results:
    print(x.user_id, x.user_data, x.time_stamp)

In [None]:
session3.commit()

In [None]:
d1 = session3.execute(select(Userdata).where(Userdata.user_id=="Flusher")).scalar()
d2 = session3.execute(select(Userdata).where(Userdata.user_id=="Flusher3")).scalar()
d3 = session3.execute(select(Userdata).where(Userdata.user_id=="Flusher6")).scalar()
session3.delete(d1)
session3.delete(d2)
session3.delete(d3)
session3.commit()

In [None]:
from sqlalchemy import select

session = Session(engine)
stmt = select(Userdata).where(Userdata.user_id == "Brad")
brad = session.execute(stmt).scalar()
brad.user_data = "Changed2 data"
session.commit()

In [None]:
from sqlalchemy import update

session.execute(
    update(Userdata)
    .where(Userdata.user_id == "Brad")
    .values(user_data="Changed again")
)

In [None]:
session.commit()

In [None]:
id = "Brian"
brad = session.execute(
    select(Userdata).where(Userdata.user_id == id)
    ).scalar()
if brad:
    brad.user_data = "More changes data"
    session.commit()
else:
    print(f"User '{id}' does not exist")
session.close()

In [None]:
id = "Brad"
brad = session.execute(
    select(Userdata).where(Userdata.user_id == id)
    ).scalar()
if brad:
    session.execute(
        update(Userdata)
        .where(Userdata.user_id == id)
        .values(user_data="Changed yet again")
    )
    session.commit()
else:
    print(f"User '{id}' does not exist")
session.close()

In [None]:
session.execute(update(Userdata).values(user_data="Changed again"))
session.commit()

In [None]:
change_list = [{"user_id": "Brad", "user_data": "bulk 32 data for Brad"},
               {"user_id": "Larry", "user_data": "Larry's85  bulk data"}]

session.execute(update(Userdata), change_list)
session.commit()

In [None]:
brad = session.execute(
    select(Userdata).where(Userdata.user_id == "Brad")
    ).scalar()

session.delete(brad)
session.commit()
session.close()

In [None]:
from sqlalchemy import delete

session.execute(delete(Userdata).where(Userdata.user_id == "Larry"))
session.commit()
session.close()

In [None]:
from sqlalchemy import insert

add_list = [{"user_id": "Barbra", "user_data": "Data for Barbra"},
            {"user_id": "Jessa", "user_data": "Jessa's data"}]

session.execute(insert(Userdata), add_list)
session.commit()


In [None]:
from sqlalchemy import select
from sqlalchemy.orm import Session

session = Session(engine)
stmt = select(Userdata)
results = session.execute(stmt).scalars()
for x in results:
    print(x)

End experiment with flush

In [None]:
from sqlalchemy import select

session = Session(engine)

user = Userdata(user_id="Flusher", user_data="Flushed?")
session.add(user)
stmt = select(Userdata).execution_options(autoflush=False)
results = session.execute(stmt).scalars()
for x in results:
    print(x.user_id, x.user_data, x.time_stamp)

In [None]:
from sqlalchemy import select

stmt = select(Userdata)
results = session.execute(stmt).scalars()
for x in results:
    print(x.user_id)

In [None]:
session.flush()

In [None]:
stmt = select(Userdata).execution_options(autoflush=False)
results = session.execute(stmt).fetchall()
for x in results:
    print(type(x[0]))

In [None]:
stmt = select(Userdata).execution_options(autoflush=False)
results = session.execute(stmt).scalars()
for x in results:
    print(x.user_data)

In [None]:
stmt = select(Userdata.user_data, Userdata.time_stamp).where(Userdata.user_id=="Brad")
results = session.execute(stmt).one()
print(results)
print(results.time_stamp)

In [None]:
session.close()