In [None]:
from sqlalchemy.orm import declarative_base

# Model

In [None]:
import os
from pathlib import Path
#BASE_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = Path().resolve() #esto funciona en notebook
connection_string = "sqlite:///" + os.path.join(BASE_DIR, 'site.db')
connection_string = "sqlite:///:memory:"
print (connection_string)

In [None]:
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from datetime import datetime

Base = declarative_base()

engine = create_engine(connection_string, echo=True)
"""
class User
    id int
    username str
    email str
    date_create datetime
"""

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String(25), nullable=False, unique=True)
    email = Column(String(80), nullable=False, unique=True)
    date_created = Column(DateTime, default=datetime.utcnow())
    
    def __repr__ (self):
        return f"<User username={self.username} email={self.email}>"
    
new_user = User(id=1, username="jonathan", email = "jone@hui.com")
print (new_user)

In [None]:
#create database if not exists
Base.metadata.create_all(engine)

session sirve para hacer las transacciones (lazy¿?)

In [None]:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker()
local_session = Session(bind=engine)

# Insert

In [None]:
new_user = User(username="frank", email = "frank@sinatra.com")
local_session.add(new_user)
local_session.commit()

In [None]:
users = [
    {"username":"jhonny",
    "email":"jh@company.com"},
    {"username":"jerry",
    "email":"je@company.com"},
    {"username":"lucas",
    "email":"lu@company.com"},
    {"username":"igor",
    "email":"ig@company.com"},
    {"username":"gaby",
    "email":"gb@company.com"},
    {"username":"tom",
    "email":"tm@company.com"},
]

In [None]:
for u in users:
    new_user = User(username = u["username"], email = u["email"])
    local_session.add(new_user)    
    local_session.commit()

# Query

In [None]:
users = local_session.query(User).all()
for user in users:
    print (user.username)

In [None]:
users = local_session.query(User).all()[:3]
for user in users:
    print (user.username)

# Update

In [None]:
#gaby
user = local_session.query(User).filter(User.username == "gaby").first()


In [None]:
print (user)

In [None]:
#updating

In [None]:
user

In [None]:
user.username="gabriela"
user.email = "gabriela@company.com"

In [None]:
local_session.commit()

# Delete

In [None]:
user_to_delete = local_session.query(User).filter(User.username == "gabriela").first()

In [None]:
local_session.delete(user_to_delete)

In [None]:
local_session.commit()

# Order

In [None]:
users = local_session.query(User).order_by(User.username).all()

In [None]:
users

In [None]:
from sqlalchemy import desc
users = local_session.query(User).order_by(desc(User.username)).all()
users

In [None]:
import sqlalchemy
sqlalchemy.__version__

In [None]:
%reset -f

Otro Ejemplo

In [None]:
from sqlalchemy import create_engine, ForeignKey, Column, String, Integer, CHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Persona(Base):
    __tablename__ = "personas"
    
    ssn = Column(Integer, primary_key=True)
    firstname = Column(String)
    lastname = Column(String)
    gender = Column(CHAR)
    age = Column(Integer)
    
    def __init__(self, ssn, firstname, lastname, gender, age):
        self.ssn = ssn
        self.firstname = firstname
        self.lastname = lastname
        self.gender = gender.upper()
        self.age = age
        
    def __repr__(self):
        return f"{self.ssn} {self.firstname} {self.lastname} ({self.gender}, {self.age})"

In [None]:
engine = create_engine("sqlite:///:memory:", echo=True)

In [None]:
Base.metadata.create_all(bind=engine)

In [None]:
Session = sessionmaker(bind=engine)
session = Session()

In [None]:
tom = Persona(21878, "Mike", "Smith", "m", 35)
session.add(tom)
session.commit()

In [None]:
p1 = Persona(8219821, "Jhon", "Smith", "m", 31)
p2 = Persona(209130938, "Lucas", "Row", "m", 23)
p3 = Persona(99999, "Claudia", "López", "f", 45)
session.add_all([p1,p2,p3])

In [None]:
session.commit()

In [None]:
    session.query(Persona).all()

In [None]:
resultados = session.query(Persona)
print ("==")
for r in resultados:
    print (r)

In [None]:
nuevos = resultados.filter(Persona.age < 40).all()
print (nuevos)

# Relaciones

In [None]:
%reset -f

In [None]:
from sqlalchemy import create_engine, ForeignKey, Column, String, Integer, CHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Persona(Base):
    __tablename__ = "personas"
    
    ssn = Column(Integer, primary_key=True)
    firstname = Column(String)
    lastname = Column(String)
    gender = Column(CHAR)
    age = Column(Integer)
    
    def __init__(self, ssn, firstname, lastname, gender, age):
        self.ssn = ssn
        self.firstname = firstname
        self.lastname = lastname
        self.gender = gender.upper()
        self.age = age
        
    def __repr__(self):
        return f"{self.ssn} {self.firstname} {self.lastname} ({self.gender}, {self.age})"
    
class Cosa(Base):
    __tablename__ = "cosas"
    
    tid = Column(Integer, primary_key=True)
    description = Column(String)
    owner = Column(Integer, ForeignKey("personas.ssn"))
    
    def __repr__(self):
        return f"{self.tid} {self.description} owned by {self.owner}"
    
engine = create_engine("sqlite:///:memory:", echo=True)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()


In [None]:
p0 = Persona(21878, "Mike", "Smith", "m", 35)
p1 = Persona(8219821, "Jhon", "Smith", "m", 31)
p2 = Persona(209130938, "Lucas", "Row", "m", 23)
p3 = Persona(99999, "Claudia", "López", "f", 45)
session.add_all([p0, p1,p2,p3])
session.commit()

In [None]:
t1 = Cosa(tid= 23893232, description = "Una cosa", owner = p0.ssn)

In [None]:
session.add(t1)
session.commit()

In [None]:
t2 = Cosa(description = "PS5", owner = p1.ssn)
t3 = Cosa(description = "Auto", owner = p0.ssn)
session.add_all([t2,t3])
session.commit()

In [None]:
t3

In [None]:
results = session.query(Cosa, Persona).filter(Cosa.owner == Persona.ssn).filter(Persona.firstname == "Mike")

In [None]:
for r in results:
    print (r)

In [None]:
%reset -f

# Otro Ejemplo 
### 1. Partimos con DBAPI

In [None]:
import sqlite3

db_file = "local.db"
with sqlite3.connect(db_file) as conn:
    conn.execute(
        """CREATE TABLE IF NOT EXISTS test (
            col1 integer,
            col2 string
            )
            """
    )

In [None]:
with sqlite3.connect(db_file) as conn:
    conn.execute(
    """INSERT INTO test VALUES (:val_1, :val_2)""",
    {"val_1": 1, "val_2":2}
    )
    result = conn.execute("SELECT * FROM test").fetchall()
result

El mecanismo de arriba es para evitar una inyección de SQL

In [None]:
type(result[0])

Igual es un sistema complicado, porque las queries son texto y si cambio el motor de BBDD tengo que cambiar el código y no ocupa características de Python

### 2. Usando SqlAlchemy

In [None]:
import sqlalchemy as sa

Metadata es el registro de las tablas y permite a SQLAlchemy entender como se conectan. Debe ser un objeto global y todas las tablas deben usar el mismo objeto metadata

In [None]:
meta = sa.MetaData()

Interpretamos la tabla en Python, se puede usar:

In [None]:
test_table = sa.Table("test", 
                   meta,
                   sa.Column("col1", sa.Integer),
                   sa.Column("col2", sa.String))

Todavía la tabla no tiene idea de la base de datos, para eso se usa el **engine**

Al crear el **engine** no se conecta a la DB, simplemente hace validación de la dirección (path, url) y prepara los dialectos para el motor, también se pueden poder muchas opciones de conexión.

In [None]:
#future da uso de las opciones de SQLAlchemy 2.0 que todavía está en beta
#echo escupe el SQL

engine = sa.create_engine("sqlite:///local.db", future=True)


Ahora podemos hacer consultas usando SA

In [None]:
sql = sa.select(test_table)

In [None]:
sql

In [None]:
print(sql)

In [None]:
test_table.c.col1 == "test"

In [None]:
print(test_table.c.col1 == "test")


In [None]:
print(sql.where(test_table.c.col1 == "test"))


Para correr el SQL debemos establecer una conexión, es la primera vez que hacemos algo fuera de python /en esta vuelta)

In [None]:
with engine.connect() as conn:
    result = conn.execute(sql).all()
    
result

In [None]:
type(result[0])

In [None]:
result[0].col1

In [None]:
dict(result[0])

Se parece a lo de antes, pero mejorado

Usamos `.all()` para dar todos, pero se puede usar:
*   `all` todos los resultados
*   `one` solo uno y da excepción si no hay exactamente uno
*   `one_or_none` da uno y da excepción si hay más de uno
*   `first` el primero
*   `partitions(size)` en chunks de tamaño size
*   `yield_per(num)` solo si el motor soporta stream de resultados

### 3. SQLAlchemy Core
Vamos a crear las tablas, que recordemos no están unidas a ninguna base

Vamos a usar unos datos daneses de estacionamientos, porque podemos hacerlo
https://www.opendata.dk/city-of-copenhagen/parkeringstaelling-i-zoner

In [None]:
import sqlalchemy as sa

meta = sa.MetaData()

fkt_parking = sa.Table("fkt_parking", 
                       meta, 
                       sa.Column("id", sa.Integer, primary_key=True),
                       sa.Column("area_id", sa.Integer, sa.ForeignKey("dim_area.area_id")), 
                       sa.Column("year_month", sa.VARCHAR(20)),
                       sa.Column("count_type", sa.Integer, sa.ForeignKey("dim_parking_types.type_id")),
                       sa.Column("count", sa.Integer),
                       sa.Column("hour", sa.Integer)
                      )

dim_area = sa.Table("dim_area", 
                    meta,
                    sa.Column("area_id", sa.Integer, primary_key=True),
                    sa.Column("city", sa.VARCHAR(50)),
                    sa.Column("street_name", sa.VARCHAR(200)),
                    sa.Column("postnr", sa.VARCHAR(4)),
                    sa.Column("nr", sa.Integer)
                   )
                       
dim_parking_types = sa.Table("dim_parking_types",
                             meta,
                             sa.Column("type_id", sa.Integer, primary_key=True),
                             sa.Column("name", sa.VARCHAR(50), unique=True)
                            )

In [None]:
from pathlib import Path
db_name = "parking.db"
#esto es para poder correr más de una vez esto, borra la BD si existe
if Path(db_name).exists():
    Path(db_name).unlink()
conn_string = f"sqlite:///{db_name}"


In [None]:
meta.tables


In [None]:
engine = sa.create_engine(conn_string, future=True, echo=True)


Como metadata sabe todo de nuestras tablas, le podemos pedir que las cree, conoce las dependencias, las relaciones y lo hace todo mágicamente

In [None]:
meta.create_all(engine)

#### 3.1 Insert

In [None]:
insert_sql = sa.insert(dim_parking_types).values(type_id=0, name="legal")


In [None]:
print(insert_sql)

Acá no hay espacio para la inyección y lo hace solo.

Esto es sólo una instrucción, hay que conectarse a la base de datos para que sirva de algo útil

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)

Si miramos la base de datos, nada ha pasado.... esto es porque si no hacemos commit los cambios se van a deshacer al salir del context manager

Se ve en la última linea que hace eso

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()

Ahora borremos ese dato

In [None]:
delete_sql = dim_parking_types.delete().where(dim_parking_types.c.type_id == 0)
print (delete_sql)

In [None]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()
    conn.execute(delete_sql)
    conn.rollback()
    
#rollback hecha marcha atrás, lo que está con commit queda

#### 3.2 Unit of Work

SA usa unit of work , esto quiere decir que espera que preparemos todos los cambios y de ahí lo enviemos a la BD de una, al mismo tiempo. Esto ayuda a SA a optimizar el resultado y la comunicación con la BD. 

El usar commit es parte de esto y ayuda a tener más control. 

Esto también significa que SA está generalmente trabajando con transacciones, las que podemos manejar explícitamente

In [None]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()
    transaction = conn.begin()
    conn.execute(insert_sql)
    transaction.rollback()

Se puede hacer así también

In [None]:
with engine.begin() as conn:
    conn.execute(insert_sql)
    conn.execute(delete_sql)

Acá hace el commit solo al salir del context manager. Si hay un error entonces la transacción hace rollback

In [None]:
try:
    with engine.begin() as conn:
        conn.execute(insert_sql)
        raise Exception("Algo malo pasó")
except Exception as e:
    print ("Este es un error medio obvio")
    print (e)

#### 3.3 Insertar muchas filas

In [None]:
import csv
import pathlib

In [None]:
data = [(pathlib.Path("data/dim_parking_types.csv"), dim_parking_types),
        (pathlib.Path("data/dim_area.csv"), dim_area),
        (pathlib.Path("data/fkt_parking.csv"), fkt_parking)]

In [None]:
data

In [None]:
#Corramos todo en una Unit of Work en una transacción
with engine.begin() as conn:
    for data_file, table in data:
        #leyendo CSV
        with data_file.open(encoding="utf-8") as f:
            rows = list(csv.DictReader(f))
        sql = table.insert()
        conn.execute(sql, parameters=rows)

Cuando pasamos una lista de diccionarios a paramters, SA sabe usar el método `.executemany()`de la libreria DB-API library, que optimiza para ingresar múltiples filas

#### 3.4 Usando lógica

Busquemos solo en la ciudad de Gilleleje


In [None]:
sql = sa.select([fkt_parking.c.count, fkt_parking.c.hour, dim_area]).join(dim_area).where(dim_area.c.city == "Gilleleje")
print (sql)

In [None]:
with engine.connect() as conn:
    results = conn.execute(sql).all()

In [None]:
[dict(row) for row in results[:2]]

In [None]:
import pandas as pd
pd.DataFrame(results)

In [None]:
#alternativamente 
with engine.connect() as conn:
    df = pd.read_sql(sql, conn)
df

Ahora calculemos la suma de cuentas por hora por ciudad

`func` es para funciones de sql, en este caso la suma

In [None]:
sql = (
    sa.select([sa.func.sum(fkt_parking.c.count).label("total_count"), 
               dim_area.c.city,
               fkt_parking.c.hour])
    .join(dim_area)
    .group_by(dim_area.c.city, fkt_parking.c.hour)
    )


print(sql)

In [None]:
with engine.connect() as conn:
    result = conn.execute(sql)
    df = pd.DataFrame(result)
df

#### 3.5 Refactorizando

Está quedando todo muy largo, y con repeticiones. Como es python podemos crear funciones y variables para manipular el SQL

In [None]:
table = fkt_parking.join(dim_area)
print (table)

In [None]:
total_count = sa.func.sum(fkt_parking.c.count).label("total_count")
print (total_count)

In [None]:
dimension_cols = [dim_area.c.city, fkt_parking.c.hour]
print (dimension_cols)

In [None]:
sql = sa.select([total_count, *dimension_cols]).select_from(table).group_by(*dimension_cols)


Esto es el mismo código que antes, pero se ve más legible

    sql = (sa.select([sa.func.sum(fkt_parking.c.count).label("total_count"), 
                  dim_area.c.city,
                  fkt_parking.c.hour])
         .join(dim_area)
         .group_by(dim_area.c.city, fkt_parking.c.hour))

#### 3.6 cambios on-the-fly

In [None]:
#Agregar una columna adicional al groupby
print(sql.group_by(fkt_parking.c.count_type))

In [None]:
# Columna adicional al select
print(sql.add_columns(fkt_parking.c.count_type))

In [None]:
# Esto no alteró el SQL original
print(sql)

In [None]:
with engine.connect() as conn:
    # Dynamically add a limit statement
    result = conn.execute(sql.limit(10))
    df = pd.DataFrame(result)
df


Ahora calculemos la tasa de ocupación. Se podría definir una tabla intermedia *(CTE (Common Table Expression)* o una subquery

In [None]:
# Reusamos el select básico
base_select = sa.select([fkt_parking.c.count, fkt_parking.c.area_id, fkt_parking.c.year_month, fkt_parking.c.hour]).join(dim_parking_types)

In [None]:
print (base_select)

In [None]:
# Creamos un CTE que se llama available_spaces filtrado en  parking_types
available_spaces = base_select.where(dim_parking_types.c.name == "legal").cte("available_spaces")
print (available_spaces)

In [None]:
# Otro CTE llamado  occupied spaces filtrado en parking_types
counted_spaces = base_select.where(dim_parking_types.c.name == "counted").cte("occupied_spaces")
print (counted_spaces)

In [None]:
# Definimos la métrica
occupancy_rate = (100 * sa.cast(counted_spaces.c.count, sa.Float) / available_spaces.c.count).label("occupancy_rate")

In [None]:
print (occupancy_rate)

In [None]:
# Definimos el join, los CTE no tienen claves foraneas que SA pueda usar para inferir la relación
cte_join_condition = sa.and_(counted_spaces.c.area_id == available_spaces.c.area_id,
                        counted_spaces.c.year_month == available_spaces.c.year_month,
                        counted_spaces.c.hour == available_spaces.c.hour
                        )
print (cte_join_condition)

In [None]:
# Creamos el join
joined_ctes = available_spaces.join(counted_spaces, onclause=cte_join_condition)
print(joined_ctes)

In [None]:
sql = (sa.select([occupancy_rate, dim_area.c.city, available_spaces.c.year_month, available_spaces.c.hour])
       .select_from(joined_ctes)
       .join(dim_area) # SQLAlchemy puede inferir las claves foráneas a través del CTE
      )
print(sql)

In [None]:
with engine.connect() as conn:
    df = pd.read_sql(sql.order_by(occupancy_rate.desc()), conn)
df

Ejercicio: Filtrar las que tienen ocupación sobre 100

In [None]:
with engine.connect() as conn:
    df = pd.read_sql(sql.where(occupancy_rate > 100).order_by(occupancy_rate.desc()), conn)
df

Core SQL es parecido a SQL pero con las ventajas de un lenguaje de programación

### 4. SQLAlchemy ORM

ORM es Object Relational Mapper, y es una capa que mapea la BD a objetos de pythons. Esto puede ser a expensas de cierta flexibilidad y transparencia respect al SQL subyacente.

Como regla general
*   Core sirve mejor para queries analiticas donde se espera obtener muchas filas
*   ORM es mejor para aplicaciones donde solo necesitamos unas cuanas filas cada vez

#### 4.1 Definiendo tablas

In [None]:
import sqlalchemy as sa
from sqlalchemy.orm import declarative_base

Base = declarative_base() 

class MyClass(Base):
    __tablename__ = "demo_table"
    
    #Hay que definir al menos una clave primaria
    class_id: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)

Uno de los cambios en SA 2.0 es la habilidad de registrar clases con un decorador, lo que parece más en línea con clases basadas en `dataclass`y `attrs`


In [None]:
from sqlalchemy.orm import registry
import enum

mapper_registry = registry()

Dado el `registry` podemos definir clases que definen el modelo de dato. Los type hints son opcionales

Estas son clases regulares, entonces podemos agregar un  `__repr__` por ejemplo. 

In [None]:
@mapper_registry.mapped
class Address:
    __tablename__ = "addresses"
    
    address_id: int = sa.Column(sa.Integer, primary_key=True)
    street_name: str = sa.Column(sa.VARCHAR(50))
    street_number: int = sa.Column(sa.Integer)
    postnr: str = sa.Column(sa.VARCHAR(4))
    
    # Como es un simplemente una clase puedo poner un repr
    def __repr__(self):
        return f"{self.street_name} street_number={self.street_number} postnr={self.postnr}>"

La capa ORM genera una Tabla SA y la fija al atributo `__table__` , que es el mismo que vimos en Core



In [None]:
Address.__table__

No definimos un `__init__` porque SA genera uno automáticamente. Podríamos agregarlo si queremos y se puede agregar más lógica.

Vamos a agregar un objeto `Purchase` y otro `Customer`, con sus relaciones

In [None]:
import decimal
from sqlalchemy.orm import relationship

@mapper_registry.mapped
class Purchase:
    __tablename__ = "purchases"
    __table_args__ = {"extend_existing": True}
    
    purchase_id: int = sa.Column(sa.Integer, primary_key=True)
    item_name: str = sa.Column(sa.VARCHAR(200))
    price: decimal.Decimal = sa.Column(sa.Numeric(19, 4))
    user_id: int = sa.Column(sa.Integer, sa.ForeignKey("customers.customer_id"))
    
    def __repr__(self):
        return f"{self.item_name}>"

Podemos usar tipos nativos de python como enums y decimals, SA los va a convertir a los tipos que entienda la BD



In [None]:
class StatusEnum(str, enum.Enum):
    gold = "gold"
    silver = "silver"
    bronze = "bronze"
    
@mapper_registry.mapped
class Customer:
    __tablename__ = "customers"
    __table_args__ = {"extend_existing": True}
    
    customer_id: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.VARCHAR(50), unique=True)
    status: str = sa.Column(sa.Enum(StatusEnum))
    address_id: int = sa.Column(sa.Integer, sa.ForeignKey("addresses.address_id"))
    
    # One-to-one relationship
    address: Address = relationship("Address", backref="customer")
    
    # One-to-many
    purchases: list[Purchase] = relationship("Purchase", backref="customer")
    
    def __repr__(self):
        return f"{self.name}>"

#### 4.2 Relaciones

In [None]:
conn_string = "sqlite:///parking_orm.db"
engine = sa.create_engine(conn_string, future=True, echo=True)


ORM está construido sobre **Core**, podemos usar el engine y la metadata como antes

In [None]:
mapper_registry.metadata.create_all(engine)


In [None]:
john = Customer(name="John", status=StatusEnum.gold)
jane = Customer(name="Jane", status=StatusEnum.bronze)

#### 4.3 Session
En ORM usamos Session en vez de conexión. La session sabe como trabajar con las clases de ORM, y sirve como mapa local de las instancias, manteniendo registro de que instancias han cambiado , cuales osn nuevas, etc.


In [None]:
from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(john)
    session.add(jane)
    #hay que hace commit
    session.commit()

Agreguemos una dirección a John

In [None]:
address = Address(street_name="Bogholder Allè", street_number=15, postnr=2720)
address

In [None]:
john.address = address

In [None]:
with Session(engine) as session:
    session.add(john)
    session.commit()

Ahora John va a comprar

In [None]:
potion = Purchase(item_name="Magic Potion", price=20.00, customer=john)
with Session(engine) as session:
    session.add(potion)
    session.commit()

In [None]:
magic_hat = Purchase(item_name="Magic Hat", price=100)

In [None]:
with Session(engine) as session:
    # Necesitamos conectar a john a esta session
    session.add(john)
    # purchases es relación una-a-varios, entonces SA lo representa como lista
    john.purchases.append(magic_hat)
    session.add(john)
    session.commit()

Se busca (select) igual que en Core

In [None]:
sql = sa.select(Customer).filter_by(name="Jane")
print(sql)


In [None]:
with Session(engine) as session:
    jane = session.execute(sql).one_or_none()

In [None]:
jane

In [None]:
type(jane)

El resultado de esta query es un objeto `Row`, lo mismo que en Core, pero en modo ORM a veces nos interesamos en resultados escalares, el valor de la primera columna para cada fila

SA soporta esto por el modificador de escalares y los helpers de escalares

In [None]:
with Session(engine) as session:
    jane = session.execute(sql).scalars().one_or_none()

In [None]:
jane

In [None]:
type(jane)

In [None]:
with Session(engine) as session:
    jane = session.execute(sql).scalar_one_or_none()

In [None]:
print (jane)
print (type(jane))

Si conocemos la clave primaria, SA provee un método eficiente para buscarla 

In [None]:
with Session(engine) as session:
    jane2 = session.get(Customer, jane.customer_id)

In [None]:
jane2

Ahora podemos preguntar por los atributos relacionados

In [None]:
with Session(engine) as session:
    session.add(john)
    print("Purchases:\t", john.purchases)
    print("Address:\t", john.address)

Si chequeamos un atributo no hay SQL emitido

In [None]:
john.status


#### 4.2 Carga de relaciones

Para acceder a los atributos de la relación, debemos estar dentro de una sesión, ya que, de forma predeterminada, las relaciones de SQLAlchemy son cargados de manera floja (*lazy-loading*).

Las consultas de *lazy-loading* generan consultas SQL adicionales cuando se accede a ellas para evitar cargar todos los datos relacionados en la memoria a la vez. La relación se puede configurar para que se cargue de [diferentes maneras](https://docs.sqlalchemy.org/en/14/orm/loading_relationships.html#relationship-loading-techniques), definidas en el constructor de relaciones o como opciones de `select`


In [None]:
from sqlalchemy.orm import joinedload, selectinload

with Session(engine) as session:
    sql = sa.select(Customer).options(joinedload(Customer.address), selectinload(Customer.purchases)).where(Customer.name == "John")
    john = session.execute(sql).unique().scalar_one()

In [None]:
john

Alternativamente, podemos definir la relación para que no sea `lazy`: agreguemos una tabla de `loyalty_points` que registre cuántos puntos de fidelidad tiene una compra determinada.

In [None]:
@mapper_registry.mapped
class LoyaltyPoints:
    __tablename__ = "loyalty_points"
    __table_args__ = {"extend_existing": True}
    
    loyalty_point_id: int = sa.Column(sa.Integer, primary_key=True)
    customer_id: int = sa.Column(sa.Integer, sa.ForeignKey("customers.customer_id"))
    purchase_id: int = sa.Column(sa.Integer, sa.ForeignKey("purchases.purchase_id"))
    total_points: int = sa.Column(sa.Integer)
    
    # One-to-one relationship
    purchase: Purchase = relationship("Purchase", backref="points", lazy="joined")
    
    # One-to-one
    customer: Customer = relationship("Customer", backref="points", lazy="selectin")

In [None]:

mapper_registry.metadata.create_all(engine)

Agreguemos unos loyalty points

In [None]:
with Session(engine) as session:
    sql = sa.select(Customer).filter_by(name="John")
    john = session.execute(sql).scalar_one()
    loyalty_purchase = john.purchases[0]
    loyalty_points = LoyaltyPoints(customer=john, purchase=loyalty_purchase, total_points=1000)
    session.add(loyalty_points)
    session.commit()

Veamos que pasa si seleccionamos a john nuevamente

In [None]:
with Session(engine) as session:
    sql = sa.select(LoyaltyPoints).where(LoyaltyPoints.customer.has(Customer.name == "John"))
    john_points = session.execute(sql).scalar_one()

In [None]:
john_points.purchase

La carga de relaciones es uno de los mayores beneficios de los ORM, pero también puede ser la forma más fácil de meter las patas. Hay que tener ojo en la estrategia de relaciones

#### 4.3 ORM son clases
Vamos a agregar `last_updated` y `created_at` columnas a todo el modelo

In [None]:
import datetime as dt

class CreatedMixin:
    last_updated: dt.datetime = sa.Column(sa.DateTime, default=sa.func.now(), onupdate=sa.func.now())
    created_at: dt.datetime = sa.Column(sa.DateTime, default=sa.func.now())

un Mixin es el nombre de un patrón que permite que las clases hereden la funcionalidad, pero solo se extiende en lugar de sobrescribirse.

In [None]:
@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0)

In [None]:
list(User.__table__.columns)

Dado que `User` es una clase normal, podemos definir un `classmethod` para proporcionar un constructor alternativo, por ejemplo, desde una carga JSON.

In [None]:
@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0) # Note that default here only applies to the table - not to the generated __init__ function
    
    @classmethod
    def from_dict(cls, data):
        return cls(name=data["UserName"], role="public", purchases=0)

In [None]:
user = User.from_dict({"UserName": "Jarvis"})

Podemos agregar propiedades

In [None]:
@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0)
    
    @classmethod
    def from_dict(cls, data):
        return cls(name=data["UserName"], role="public", purchases=0)
    
    @property
    def is_admin(self):
        return self.role == "admin"

In [None]:
user = User(name="Jade", role="public")
user.is_admin

Si queremos, también podemos usar la propiedad en las consultas, definiéndola como una propiedad_híbrida. Esto nos permite escribir User.is_admin para generar una expresión SQL

In [None]:
from sqlalchemy.ext.hybrid import hybrid_property

@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0)
    
    @classmethod
    def from_dict(cls, data):
        return cls(name=data["UserName"], role="public")
    
    @hybrid_property
    def is_admin(self):
        return self.role == "admin"

Creemos las tablas

In [None]:
mapper_registry.metadata.create_all(engine)

In [None]:
user = User(name="Jade", role="admin")

In [None]:
#también se puede usar la sesión sin context manager
session = Session(engine)
session.add(user)
session.commit()

In [None]:
sql = sa.select(User).where(User.is_admin)

In [None]:
print(sql)

In [None]:
admin_user = session.execute(sql).scalar_one_or_none()
print(f"Last updated: {admin_user.last_updated:%H:%M:%S}")

In [None]:
admin_user.name = "Jade Smith"
session.add(admin_user)
session.commit()
print(f"Last updated: {admin_user.last_updated:%H:%M:%S}")


A veces, la lógica de SQL y la lógica de Python difieren y deben escribirse de dos maneras diferentes. Cada propiedad_híbrida puede definir una expresión que se ejecutará cuando se use dentro de una instrucción SQL.

In [None]:
@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0)
    
    @classmethod
    def from_dict(cls, data):
        return cls(name=data["UserName"], role="public")
    
    @hybrid_property
    def is_admin(self):
        return self.role == "admin"
    
    @hybrid_property
    def is_validated(self):
        return self.role in ["public", "admin"]
        
    @is_validated.expression # This provides the SQL definition of is_validated
    def is_validated(cls):
        return cls.role.in_(["public", "admin"])

In [None]:
user = User(name="Jade", role="public")

In [None]:
user.is_validated

In [None]:
sql = sa.select(User).where(User.is_validated)
print(sql)

In [None]:
validated_users = session.execute(sql).scalars().all()


In [None]:
validated_users

In [None]:
validated_users[0].name

Así que ahora tenemos la lógica de Python asignada tanto a SQL como a nuestra instancia local de Python. Hasta ahora, ha sido una propiedad simple, ¿qué pasa con la lógica?

In [None]:
from sqlalchemy.ext.hybrid import hybrid_method

@mapper_registry.mapped
class User(CreatedMixin):
    __tablename__ = "users"
    __table_args__ = {"extend_existing": True} 
    
    primary: int = sa.Column(sa.Integer, primary_key=True)
    name: str = sa.Column(sa.String)
    role: str = sa.Column(sa.String)
    purchases: int = sa.Column(sa.Integer, default=0)
    
    @classmethod
    def from_dict(cls, data):
        return cls(name=data["UserName"], role="public")
    
    @hybrid_property
    def is_admin(self):
        return self.role == "admin"
    
    @hybrid_property
    def is_validated(self):
        return self.role in ["public", "admin"]
        
    @is_validated.expression
    def is_validated(cls):
        return cls.role.in_(["public", "admin"])

    # We can define arbitrary methods - handy for logic encapsulation!
    def purchase(self, session: Session, item_cost: int) -> int:
        self.purchases += item_cost
        session.add(self)
        return self.purchases
    
    @hybrid_method # Define arbitrary methods
    def calculate_roi(self, total_cost: int) -> float:
        return (self.purchases - total_cost) / total_cost

In [None]:
admin_user = session.execute(sql).scalar_one_or_none()
print(f"Last updated: {admin_user.last_updated:%H:%M:%S}")

In [None]:
user = User(name="Jane", role="admin", purchases=0)

In [None]:
user.purchase(session, 2_000)
session.commit()

In [None]:
user.purchases


¿Qué pasa si queremos usar un cálculo dentro de nuestra consulta SQL? eso es lo que hace el método_híbrido. Sigue las mismas reglas que la propiedad, pero funciona con argumentos.

In [None]:
user.calculate_roi(total_cost=1000)

In [None]:
total_cost = 1000
sql = sa.select(User, User.calculate_roi(total_cost=total_cost)).where(User.calculate_roi(total_cost=total_cost) >= 1)

print(sql)

In [None]:
print([(user.name, user.calculate_roi(total_cost=total_cost)) for user in session.execute(sql).scalars()])


In [None]:
session.close()