In [43]:
from sqlalchemy import Column, Integer, String, BigInteger, Sequence, Table, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

BaseModel = declarative_base()


class publicusers(BaseModel):
	__tablename__ = 'users' 

	id = Column(Integer, primary_key=True)
	name = Column(String)

	usersRel = relationship('publicusergroups', back_populates='usersRelation')

class publicusergroups(BaseModel):
	__tablename__ = 'usergroups' 

	id = Column(Integer, primary_key=True)
	user_id = Column(Integer, ForeignKey('users.id', onupdate = "NO ACTION", ondelete = "NO ACTION"))
	group_id = Column(Integer, ForeignKey('groups.id', onupdate = "NO ACTION", ondelete = "NO ACTION"))

	usersRelation = relationship('publicusers', uselist=True, back_populates='usersRel')
	groupsRelation = relationship('publicgroups', uselist=True, back_populates='groupsRel')


class publicgroups(BaseModel):
	__tablename__ = 'groups' 

	id = Column(Integer, primary_key=True)
	name = Column(String)

	groupsRel = relationship('publicusergroups', back_populates='groupsRelation')


class publicuzivatel(BaseModel):
	__tablename__ = 'uzivatel' 

	uzivatele_id = Column(Integer, Sequence('uzivatel_uzivatele_id_seq'), primary_key=True)
	jmeno = Column(String(60))
	prijmeni = Column(String(60))
	datum_narozeni = Column(Integer)
	pocet_clanku = Column(Integer)


In [44]:
from typing import List, Optional
from pydantic import BaseModel as BaseSchema


class usersCreateSchema(BaseSchema):
	id: int
	name: str

class usersGetSchema(BaseSchema):
	id: int
	name: str
	class Config:
		orm_mode = True
	pass

class usersUpdateSchema(BaseSchema):
	id: int
	name: str

class usersDeleteSchema(BaseSchema):
	id: int
	name: str

class usergroupsCreateSchema(BaseSchema):
	id: int
	user_id: int
	group_id: int

class usergroupsGetSchema(BaseSchema):
	id: int
	user_id: int
	group_id: int
	class Config:
		orm_mode = True
	pass

class usergroupsUpdateSchema(BaseSchema):
	id: int
	user_id: int
	group_id: int

class usergroupsDeleteSchema(BaseSchema):
	id: int
	user_id: int
	group_id: int

class groupsCreateSchema(BaseSchema):
	id: int
	name: str

class groupsGetSchema(BaseSchema):
	id: int
	name: str
	class Config:
		orm_mode = True
	pass

class groupsUpdateSchema(BaseSchema):
	id: int
	name: str

class groupsDeleteSchema(BaseSchema):
	id: int
	name: str

class uzivatelCreateSchema(BaseSchema):
	uzivatele_id: int
	jmeno: str
	prijmeni: str
	datum_narozeni: int
	pocet_clanku: int

class uzivatelGetSchema(BaseSchema):
	uzivatele_id: int
	jmeno: str
	prijmeni: str
	datum_narozeni: int
	pocet_clanku: int
	class Config:
		orm_mode = True # to read the data even if it is not a dict, but an ORM model
	pass

class uzivatelUpdateSchema(BaseSchema):
	uzivatele_id: int
	jmeno: str
	prijmeni: str
	datum_narozeni: int
	pocet_clanku: int

class uzivatelDeleteSchema(BaseSchema):
	uzivatele_id: int
	jmeno: str
	prijmeni: str
	datum_narozeni: int
	pocet_clanku: int

In [45]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session
import uvicorn
from fastapi import FastAPI
from fastapi import Depends
from fastapi import HTTPException

#import uvicorn

app = FastAPI()
def run(): #funkce spusteni
    uvicorn.run(app, port=9992, host='0.0.0.0', root_path='')  #HOST je NUTNY PARAMETR !!! 

#--HINT:--engine = create_engine('postgresql+psycopg2://user:password@hostname/database_name')
KnownDatabase = 'postgresql+psycopg2://postgres:example@postgres/testzhodiny'

#def connecttoDatabase(KnownDatabase):
engine = create_engine(KnownDatabase)
SessionLocal = sessionmaker(bind=engine)
session = Session()    
#BaseModel.metadata.drop_all(engine)    #vymazeme celou databazy
#BaseModel.metadata.create_all(engine)  #vytvorime znovu (s pozadovanou strukturou)
    #return session

"""
async def connecttoDatabase(KnownDatabase):
	engine = create_engine(KnownDatabase)
	Session = sessionmaker(bind=engine)
	session = Session()
    
    result = await session
    
    BaseModel.metadata.drop_all(engine)    #vymazeme celou databazy
    BaseModel.metadata.create_all(engine)  #vytvorime znovu (s pozadovanou strukturou)
	return result
"""
"""
def connectAllEndpoints(app):

	@app.post("/users")
	async def createusers(db: Depends(connecttoDatabase(KnownDatabase)), users: usersCreateSchema):
		usersRow = publicusers(name=users.name)
		db.add(usersRow)
		db.commit()
		db.refresh(usersRow)
		return await (usersRow)

	@app.get("/users")
	async def getusers(db: Depends(connecttoDatabase(KnownDatabase)), users: usersGetSchema):
		return await (db.query(publicusers).first())
	async def getAllusers(db: Depends(connecttoDatabase(KnownDatabase)), skip: int = 0, limit: int = 100):
		return await (db.query(publicusers).offset(skip).limit(limit).all())

	@app.put("/users")
	async def updateusers(db: Depends(connecttoDatabase(KnownDatabase)), users: usersUpdateSchema):
		usersToUpdate = db.query(publicusers).first()
		db.commit()
		db.refresh(usersToUpdate)
		return await (usersToUpdate)

	@app.delete("/users")
	async def deleteusers(db: Depends(connecttoDatabase(KnownDatabase)), users: usersDeleteSchema):
		users_to_delete = db.query(publicusers).filter(publicusers == users).first()
		db.delete(users_to_delete)
		db.comit()
		db.refresh(users_to_delete)
		return await (users_to_delete)

	@app.post("/usergroups")
	async def createusergroups(db: Depends(connecttoDatabase(KnownDatabase)), usergroups: usergroupsCreateSchema):
		usergroupsRow = publicusergroups(user_id=usergroups.user_id)
		usergroupsRow = publicusergroups(group_id=usergroups.group_id)
		db.add(usergroupsRow)
		db.commit()
		db.refresh(usergroupsRow)
		return await (usergroupsRow)

	@app.get("/usergroups")
	async def getusergroups(db: Depends(connecttoDatabase(KnownDatabase)), usergroups: usergroupsGetSchema):
		return await (db.query(publicusergroups).first())
	async def getAllusergroups(db: Depends(connecttoDatabase(KnownDatabase)), skip: int = 0, limit: int = 100):
		return await (db.query(publicusergroups).offset(skip).limit(limit).all())

	@app.put("/usergroups")
	async def updateusergroups(db: Depends(connecttoDatabase(KnownDatabase)), usergroups: usergroupsUpdateSchema):
		usergroupsToUpdate = db.query(publicusergroups).first()
		db.commit()
		db.refresh(usergroupsToUpdate)
		return await (usergroupsToUpdate)

	@app.delete("/usergroups")
	async def deleteusergroups(db: Depends(connecttoDatabase(KnownDatabase)), usergroups: usergroupsDeleteSchema):
		usergroups_to_delete = db.query(publicusergroups).filter(publicusergroups == usergroups).first()
		db.delete(usergroups_to_delete)
		db.comit()
		db.refresh(usergroups_to_delete)
		return await (usergroups_to_delete)

	@app.post("/groups")
	async def creategroups(db: Depends(connecttoDatabase(KnownDatabase)), groups: groupsCreateSchema):
		groupsRow = publicgroups(name=groups.name)
		db.add(groupsRow)
		db.commit()
		db.refresh(groupsRow)
		return await (groupsRow)

	@app.get("/groups")
	async def getgroups(db: Depends(connecttoDatabase(KnownDatabase)), groups: groupsGetSchema):
		return await (db.query(publicgroups).first())
	async def getAllgroups(db: Depends(connecttoDatabase(KnownDatabase)), skip: int = 0, limit: int = 100):
		return await (db.query(publicgroups).offset(skip).limit(limit).all())

	@app.put("/groups")
	async def updategroups(db: Depends(connecttoDatabase(KnownDatabase)), groups: groupsUpdateSchema):
		groupsToUpdate = db.query(publicgroups).first()
		db.commit()
		db.refresh(groupsToUpdate)
		return await (groupsToUpdate)

	@app.delete("/groups")
	async def deletegroups(db: Depends(connecttoDatabase(KnownDatabase)), groups: groupsDeleteSchema):
		groups_to_delete = db.query(publicgroups).filter(publicgroups == groups).first()
		db.delete(groups_to_delete)
		db.comit()
		db.refresh(groups_to_delete)
		return await (groups_to_delete)

	@app.post("/uzivatel")
	async def createuzivatel(db: Depends(connecttoDatabase(KnownDatabase)), uzivatel: uzivatelCreateSchema):
		uzivatelRow = publicuzivatel(jmeno=uzivatel.jmeno)
		uzivatelRow = publicuzivatel(prijmeni=uzivatel.prijmeni)
		uzivatelRow = publicuzivatel(datum_narozeni=uzivatel.datum_narozeni)
		uzivatelRow = publicuzivatel(pocet_clanku=uzivatel.pocet_clanku)
		db.add(uzivatelRow)
		db.commit()
		db.refresh(uzivatelRow)
		return await (uzivatelRow)

	@app.get("/uzivatel")
	async def getuzivatel(db: Depends(connecttoDatabase(KnownDatabase)), uzivatel: uzivatelGetSchema):
		return await (db.query(publicuzivatel).first())
	async def getAlluzivatel(db: Depends(connecttoDatabase(KnownDatabase)), skip: int = 0, limit: int = 100):
		return await (db.query(publicuzivatel).offset(skip).limit(limit).all())

	@app.put("/uzivatel")
	async def updateuzivatel(db: Depends(connecttoDatabase(KnownDatabase)), uzivatel: uzivatelUpdateSchema):
		uzivatelToUpdate = db.query(publicuzivatel).first()
		db.commit()
		db.refresh(uzivatelToUpdate)
		return await (uzivatelToUpdate)

	@app.delete("/uzivatel")
	async def deleteuzivatel(db: Depends(connecttoDatabase(KnownDatabase)), uzivatel: uzivatelDeleteSchema):
		uzivatel_to_delete = db.query(publicuzivatel).filter(publicuzivatel == uzivatel).first()
		db.delete(uzivatel_to_delete)
		db.comit()
		db.refresh(uzivatel_to_delete)
		return await (uzivatel_to_delete)

"""    
# u schemat je () kvuli chybe ----> ale NEMELA by zde BYT !
    
    


try:
    #connecttoDatabase(KnownDatabase)
    #connectAllEndpoints(app)
    print("FINISHED")
    pass
except Exception as e:
	print("\nVyjimka: ", e, "\n\n")
	raise e
#connectAllEndpoints(app)
#print("FINISHED")

FINISHED


In [46]:

from multiprocessing import Process
from wait4it import wait_for

_api_process = None

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.get("/user/{id}", response_model=usersGetSchema)
def read_it(id: int, db: Session=Depends(get_db)):
    result = db.query(publicusers).filter(publicusers.id==id).first()
    if result is None:
        raise HTTPException(status_code=404, detail="None user")
    return result

@app.get("/allusers", response_model=List[usersGetSchema])
def getAllusers(skip: int = 0, limit: int = 100, db: Session=Depends(get_db)):
    db_user = db.query(publicusers).offset(skip).limit(limit).all()
    if (db_user is None) :
        raise HTTPException(status_code=404, detail="Empty")
    else:
        return db_user

""" 
def get_user_by_id(db: Session, id: int):
    return db.query(publicusers).filter(publicusers.id == id).first()


def crud_user(db: Session, users: usersCreateSchema):
    create_name = users.name + "jmeno"
    db_users = publicusers(id=users.id, name=create_name)
    db.add(db_users)
    db.commit()
    db.refresh(db_users)
    return db_users
"""

@app.post("/users/{id}/{edname}", response_model=usersGetSchema)
def create_user(id: int, edname: str, users: usersCreateSchema, db: Session = Depends(get_db)):
    db_id = db.query(publicusers).filter(publicusers.id == id).first()
    db_name = db.query(publicusers).filter(publicusers.name == edname).first()
    if db_id:
        raise HTTPException(status_code=400, detail="zadane ID jiz existuje")
    elif db_name:
        raise HTTPException(status_code=400, detail="Uzivatelske jmeno jiz existuje")
    #create_name = users.name + edname
    db_us= publicusers(id=id, name=edname)
    db.add(db_us)
    db.commit()
    db.refresh(db_us)
    return db_us

In [47]:

def start_api(runNew=True):
    """Stop the API if running; Start the API; Wait until API (port) is available (reachable)"""
    global _api_process
    if _api_process:
        _api_process.terminate()
        _api_process.join()
    
    if runNew:
        _api_process = Process(target=run, daemon=True)
        _api_process.start()
        wait_for(port=9992)

def delete_route(method: str, path: str):
    """Delete the given route from the API. This must be called on cells that re-define a route"""
    [app.routes.remove(route) for route in app.routes if method in route.methods and route.path == path]    
    
def delete_all_routes():
    rr = [*app.routes]
    for item in rr:
        app.routes.remove(item)    


#connecttoDatabase('postgresql+psycopg2://postgres:example@postgres/postgres')
try:
    start_api()
    print("FastAPI spusteno")
    pass
except Exception as e:
    start_api(False) #zastaveni
    print("Neco se nepovedlo")

INFO:     Started server process [329]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9992 (Press CTRL+C to quit)


FastAPI spusteno
INFO:     172.18.0.1:41182 - "GET /docs HTTP/1.1" 200 OK
INFO:     172.18.0.1:41182 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     172.18.0.1:41180 - "POST /users/53/trtrjmeno HTTP/1.1" 400 Bad Request
INFO:     172.18.0.1:41202 - "GET /users/53/KarelHalok HTTP/1.1" 405 Method Not Allowed
INFO:     172.18.0.1:41198 - "POST /users/7/KarelHalok HTTP/1.1" 400 Bad Request
INFO:     172.18.0.1:41222 - "POST /users/77/KarelHalok HTTP/1.1" 400 Bad Request
INFO:     172.18.0.1:41232 - "POST /users/78/KarelHalok HTTP/1.1" 200 OK
INFO:     172.18.0.1:41240 - "GET /allusers?skip=0&limit=100 HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [329]


In [49]:
if _api_process:
    _api_process.terminate()
    _api_process.join()
start_api(False)