# Tests connecteur SQLAlchemy

Pour pouvoir se connecter à la base de données, il faut d'abord lancer le conteneur docker correspondant créé par l'équipe backend, en utilisant docker-compose 

(voir le README: https://github.com/dataforgoodfr/13_brigade_coupes_rases/tree/e81f4c40aa5179878357bfb3809b928b5887b10e/docker)

In [None]:
# imports
import os
from dotenv import load_dotenv, find_dotenv
from db_connector import engine, get_session
from sqlalchemy import Column, Integer, String, DateTime, Float
from datetime import datetime
from sqlalchemy.orm import declarative_base

Si besoin est, voici les variables d'environnement que j'ai utilisé dans mon .env (remplacer par les bonnes valeurs):

DB_USER_NAME=abc

DB_PSWD=abc

DB_HOST=abc

DB_NAME=abc


In [None]:
# Pour vérifier que les variables d'environnement se sont bien chargées:
env_path = find_dotenv()
env_vars = load_dotenv(env_path)

db_user_name = os.environ.get('DB_USER_NAME')
db_pswd = os.environ.get('DB_PSWD')
db_host = os.environ.get('DB_HOST')
db_name = os.environ.get('DB_NAME')

DATABASE_URL = f"postgresql+psycopg2://{db_user_name}:{db_pswd}@{db_host}/{db_name}"
print(DATABASE_URL)


On crée des tables temporaires inspirées des tables de backend.models (simplifiées):

In [21]:

Base = declarative_base()

class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String, nullable=True)


class User(Base):
    __tablename__ = "users"

    ROLES = ["admin", "viewer", "volunteer"]

    id = Column(Integer, primary_key=True, index=True)
    firstname = Column(String, index=True)
    lastname = Column(String, index=True)
    role = Column(String, index=True)


class Department(Base):
    __tablename__ = "departments"

    id = Column(Integer, primary_key=True, index=True)
    code = Column(Integer, index=True)


class ClearCut(Base):
    __tablename__ = "clear_cuts"

    STATUSES = ["pending", "validated"]

    id = Column(Integer, primary_key=True, index=True)
    cut_date = Column(DateTime, index=True)
    slope_percentage = Column(Float, index=True)
    lat = Column(Float, index=True)
    lon = Column(Float, index=True)
    status = Column(String, index=True)
    department_id = Column(Integer, index=True)
    user_id = Column(Integer, index=True)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now)


# on crée les différentes tables
Base.metadata.create_all(engine)


In [22]:
# on se connecte à la DB pour pouvoir ajouter des données
session = get_session()

# on crée quelques éléments pour populer les tables
item_1 = Item(name='foo', description='abc')
item_2 = Item(name='bar', description='def')
item_3 = Item(name='baz', description='ghi')
items = [item_1, item_2, item_3]

user_1 = User(firstname='john', lastname='wick', role='hitman')
user_2 = User(firstname='will', lastname='smith', role='actor')
users = [user_1, user_2]

department_1 = Department(code=75)
department_2 = Department(code=93)
department_3 = Department(code=13)
departments = [department_1, department_2, department_3]

session.add_all(items)
session.add_all(users)
session.add_all(departments)
session.commit()

In [23]:
# Tests de lecture :

user_results = session.query(User).all()
for user in user_results:
    print(f'{user.firstname} {user.lastname}: {user.role}')

department_results = session.query(Department).all()
for dep in department_results:
    print(f'department #{dep.id}: {dep.code}')

john wick: hitman
will smith: actor
department #1: 75
department #2: 93
department #3: 13
