In [6]:
pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.10-cp311-cp311-win_amd64.whl.metadata (5.0 kB)
Downloading psycopg2-2.9.10-cp311-cp311-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   --------- ------------------------------ 0.3/1.2 MB ? eta -:--:--
   ------------------ --------------------- 0.5/1.2 MB 1.7 MB/s eta 0:00:01
   ------------------------------------ --- 1.0/1.2 MB 1.8 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 1.7 MB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import uuid
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
from sqlalchemy import create_engine, String, Integer, Float, DateTime, ForeignKey, Text
from sqlalchemy.sql import func
from pydantic import BaseModel
from typing import List
from datetime import datetime, date
import random
import string
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
import logging

# Konfigurasi database
DATABASE_URL = "postgresql://user:samuel123@localhost:5432/slip_gaji_db"
engine = create_engine(DATABASE_URL)

# Pengaturan hashing kata sandi
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Pengaturan logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Basis SQLAlchemy
class Base(DeclarativeBase):
    pass

# Model Database
class Pengguna(Base):
    __tablename__ = "pengguna"
    id: Mapped[int] = mapped_column(primary_key=True)
    nama_pengguna: Mapped[str] = mapped_column(String(50), unique=True)
    hash_kata_sandi: Mapped[str] = mapped_column(String(255))
    peran: Mapped[str] = mapped_column(String(20))  # admin atau karyawan
    gaji: Mapped[float] = mapped_column(Float, nullable=True)
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    diperbarui_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
    dibuat_oleh: Mapped[int] = mapped_column(Integer, nullable=True)
    diperbarui_oleh: Mapped[int] = mapped_column(Integer, nullable=True)

class PeriodeAbsensi(Base):
    __tablename__ = "periode_absensi"
    id: Mapped[int] = mapped_column(primary_key=True)
    tanggal_mulai: Mapped[date] = mapped_column(DateTime)
    tanggal_selesai: Mapped[date] = mapped_column(DateTime)
    penggajian_selesai: Mapped[bool] = mapped_column(default=False)
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    diperbarui_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
    dibuat_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    diperbarui_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))

class Absensi(Base):
    __tablename__ = "absensi"
    id: Mapped[int] = mapped_column(primary_key=True)
    id_pengguna: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    id_periode: Mapped[int] = mapped_column(Integer, ForeignKey("periode_absensi.id"))
    tanggal: Mapped[date] = mapped_column(DateTime)
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    diperbarui_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
    dibuat_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    diperbarui_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    alamat_ip: Mapped[str] = mapped_column(String(45))

class Lembur(Base):
    __tablename__ = "lembur"
    id: Mapped[int] = mapped_column(primary_key=True)
    id_pengguna: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    id_periode: Mapped[int] = mapped_column(Integer, ForeignKey("periode_absensi.id"))
    tanggal: Mapped[date] = mapped_column(DateTime)
    jam: Mapped[float] = mapped_column(Float)
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    diperbarui_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
    dibuat_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    diperbarui_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    alamat_ip: Mapped[str] = mapped_column(String(45))

class Reimbursement(Base):
    __tablename__ = "reimbursement"
    id: Mapped[int] = mapped_column(primary_key=True)
    id_pengguna: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    id_periode: Mapped[int] = mapped_column(Integer, ForeignKey("periode_absensi.id"))
    jumlah: Mapped[float] = mapped_column(Float)
    deskripsi: Mapped[str] = mapped_column(Text)
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
    diperbarui_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
    dibuat_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    diperbarui_oleh: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    alamat_ip: Mapped[str] = mapped_column(String(45))

class LogAudit(Base):
    __tablename__ = recom
    id: Mapped[int] = mapped_column(primary_key=True)
    id_permintaan: Mapped[str] = mapped_column(String(36))
    id_pengguna: Mapped[int] = mapped_column(Integer, ForeignKey("pengguna.id"))
    tindakan: Mapped[str] = mapped_column(String(100))
    nama_tabel: Mapped[str] = mapped_column(String(100))
    id_rekord: Mapped[int] = mapped_column(Integer)
    alamat_ip: Mapped[str] = mapped_column(String(45))
    dibuat_pada: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

# Model Pydantic
class BuatPengguna(BaseModel):
    nama_pengguna: str
    kata_sandi: str
    peran: str
    gaji: float | None = None

class BuatPeriodeAbsensi(BaseModel):
    tanggal_mulai: date
    tanggal_selesai: date

class BuatAbsensi(BaseModel):
    tanggal: date
    id_periode: int

class BuatLembur(BaseModel):
    tanggal: date
    jam: float
    id_periode: int

class BuatReimbursement(BaseModel):
    jumlah: float
    deskripsi: str
    id_periode: int

class SlipGaji(BaseModel):
    id_pengguna: int
    id_periode: int
    hari_absensi: int
    jam_lembur: float
    total_reimbursement: float
    gaji_pokok: float
    bayaran_lembur: float
    total_gaji_bersih: float

# Aplikasi FastAPI
app = FastAPI()

# Dependensi
def get_db():
    db = Session(engine)
    try:
        yield db
    finally:
        db.close()

# Fungsi Pembantu
def get_pengguna_saat_ini(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    pengguna = db.query(Pengguna).filter(Pengguna.nama_pengguna == token).first()
    if not pengguna:
        raise HTTPException(status_code=401, detail="Kredensial tidak valid")
    return pengguna

def buat_id_permintaan():
    return str(uuid.uuid4())

# Mengisi data palsu
def buat_data_palsu(db: Session):
    admin = Pengguna(
        nama_pengguna="admin",
        hash_kata_sandi=pwd_context.hash("admin123"),
        peran="admin",
        dibuat_oleh=1,
        diperbarui_oleh=1
    )
    db.add(admin)
    
    for i in range(100):
        nama_pengguna = f"karyawan{i+1}"
        karyawan = Pengguna(
            nama_pengguna=nama_pengguna,
            hash_kata_sandi=pwd_context.hash("pass123"),
            peran="karyawan",
            gaji=random.uniform(3000000, 10000000),
            dibuat_oleh=1,
            diperbarui_oleh=1
        )
        db.add(karyawan)
    db.commit()

# Endpoint API
@app.post("/periode-absensi", response_model=BuatPeriodeAbsensi)
async def buat_periode_absensi(periode: BuatPeriodeAbsensi, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    if pengguna.peran != "admin":
        raise HTTPException(status_code=403, detail="Akses admin diperlukan")
    db_periode = PeriodeAbsensi(**periode.dict(), dibuat_oleh=pengguna.id, diperbarui_oleh=pengguna.id)
    db.add(db_periode)
    db.commit()
    db.refresh(db_periode)
    
    id_permintaan = buat_id_permintaan()
    db.add(LogAudit(
        id_permintaan=id_permintaan,
        id_pengguna=pengguna.id,
        tindakan="buat_periode_absensi",
        nama_tabel="periode_absensi",
        id_rekord=db_periode.id,
        alamat_ip="127.0.0.1"
    ))
    db.commit()
    return periode

@app.post("/absensi")
async def kirim_absensi(absensi: BuatAbsensi, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    if datetime.strptime(str(absensi.tanggal), "%Y-%m-%d").weekday() >= 5:
        raise HTTPException(status_code=400, detail="Tidak dapat mengirim absensi di akhir pekan")
    
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == absensi.id_periode).first()
    if not periode or periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Periode tidak valid atau telah ditutup")
    
    ada_absensi = db.query(Absensi).filter(
        Absensi.id_pengguna == pengguna.id,
        Absensi.tanggal == absensi.tanggal
    ).first()
    if ada_absensi:
        raise HTTPException(status_code=400, detail="Absensi untuk hari ini sudah dikirim")
    
    db_absensi = Absensi(
        id_pengguna=pengguna.id,
        id_periode=absensi.id_periode,
        tanggal=absensi.tanggal,
        dibuat_oleh=pengguna.id,
        diperbarui_oleh=pengguna.id,
        alamat_ip="127.0.0.1"
    )
    db.add(db_absensi)
    db.commit()
    
    id_permintaan = buat_id_permintaan()
    db.add(LogAudit(
        id_permintaan=id_permintaan,
        id_pengguna=pengguna.id,
        tindakan="kirim_absensi",
        nama_tabel="absensi",
        id_rekord=db_absensi.id,
        alamat_ip="127.0.0.1"
    ))
    db.commit()
    return {"pesan": "Absensi berhasil dikirim"}

@app.post("/lembur")
async def kirim_lembur(lembur: BuatLembur, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    if lembur.jam > 3:
        raise HTTPException(status_code=400, detail="Lembur tidak boleh lebih dari 3 jam")
    
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == lembur.id_periode).first()
    if not periode or periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Periode tidak valid atau telah ditutup")
    
    db_lembur = Lembur(
        id_pengguna=pengguna.id,
        id_periode=lembur.id_periode,
        tanggal=lembur.tanggal,
        jam=lembur.jam,
        dibuat_oleh=pengguna.id,
        diperbarui_oleh=pengguna.id,
        alamat_ip="127.0.0.1"
    )
    db.add(db_lembur)
    db.commit()
    
    id_permintaan = buat_id_permintaan()
    db.add(LogAudit(
        id_permintaan=id_permintaan,
        id_pengguna=pengguna.id,
        tindakan="kirim_lembur",
        nama_tabel="lembur",
        id_rekord=db_lembur.id,
        alamat_ip="127.0.0.1"
    ))
    db.commit()
    return {"pesan": "Lembur berhasil dikirim"}

@app.post("/reimbursement")
async def kirim_reimbursement(reimbursement: BuatReimbursement, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == reimbursement.id_periode).first()
    if not periode or periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Periode tidak valid atau telah ditutup")
    
    db_reimbursement = Reimbursement(
        id_pengguna=pengguna.id,
        id_periode=reimbursement.id_periode,
        jumlah=reimbursement.jumlah,
        deskripsi=reimbursement.deskripsi,
        dibuat_oleh=pengguna.id,
        diperbarui_oleh=pengguna.id,
        alamat_ip="127.0.0.1"
    )
    db.add(db_reimbursement)
    db.commit()
    
    id_permintaan = buat_id_permintaan()
    db.add(LogAudit(
        id_permintaan=id_permintaan,
        id_pengguna=pengguna.id,
        tindakan="kirim_reimbursement",
        nama_tabel="reimbursement",
        id_rekord=db_reimbursement.id,
        alamat_ip="127.0.0.1"
    ))
    db.commit()
    return {"pesan": "Reimbursement berhasil dikirim"}

@app.post("/jalankan-penggajian/{id_periode}")
async def jalankan_penggajian(id_periode: int, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    if pengguna.peran != "admin":
        raise HTTPException(status_code=403, detail="Akses admin diperlukan")
    
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == id_periode).first()
    if not periode:
        raise HTTPException(status_code=404, detail="Periode tidak ditemukan")
    if periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Penggajian sudah dijalankan untuk periode ini")
    
    periode.penggajian_selesai = True
    db.commit()
    
    id_permintaan = buat_id_permintaan()
    db.add(LogAudit(
        id_permintaan=id_permintaan,
        id_pengguna=pengguna.id,
        tindakan="jalankan_penggajian",
        nama_tabel="periode_absensi",
        id_rekord=id_periode,
        alamat_ip="127.0.0.1"
    ))
    db.commit()
    return {"pesan": "Penggajian berhasil diproses"}

@app.get("/slip-gaji/{id_periode}", response_model=SlipGaji)
async def buat_slip_gaji(id_periode: int, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == id_periode).first()
    if not periode or not periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Periode tidak valid atau belum diproses")
    
    hari_absensi = db.query(Absensi).filter(
        Absensi.id_pengguna == pengguna.id,
        Absensi.id_periode == id_periode
    ).count()
    
    jam_lembur = db.query(func.sum(Lembur.jam)).filter(
        Lembur.id_pengguna == pengguna.id,
        Lembur.id_periode == id_periode
    ).scalar() or 0
    
    total_reimbursement = db.query(func.sum(Reimbursement.jumlah)).filter(
        Reimbursement.id_pengguna == pengguna.id,
        Reimbursement.id_periode == id_periode
    ).scalar() or 0
    
    gaji_pokok = pengguna.gaji or 0
    tarif_harian = gaji_pokok / 20  # Asumsi 20 hari kerja per bulan
    bayaran_absensi = hari_absensi * tarif_harian
    bayaran_lembur = jam_lembur * (tarif_harian / 8 * 2)  # Tarif 2x untuk lembur
    total_gaji_bersih = bayaran_absensi + bayaran_lembur + total_reimbursement
    
    return SlipGaji(
        id_pengguna=pengguna.id,
        id_periode=id_periode,
        hari_absensi=hari_absensi,
        jam_lembur=jam_lembur,
        total_reimbursement=total_reimbursement,
        gaji_pokok=bayaran_absensi,
        bayaran_lembur=bayaran_lembur,
        total_gaji_bersih=total_gaji_bersih
    )

@app.get("/ringkasan-penggajian/{id_periode}")
async def buat_ringkasan_penggajian(id_periode: int, pengguna: Pengguna = Depends(get_pengguna_saat_ini), db: Session = Depends(get_db)):
    if pengguna.peran != "admin":
        raise HTTPException(status_code=403, detail="Akses admin diperlukan")
    
    periode = db.query(PeriodeAbsensi).filter(PeriodeAbsensi.id == id_periode).first()
    if not periode or not periode.penggajian_selesai:
        raise HTTPException(status_code=400, detail="Periode tidak valid atau belum diproses")
    
    karyawan = db.query(Pengguna).filter(Pengguna.peran == "karyawan").all()
    ringkasan = []
    total_gaji = 0
    
    for emp in karyawan:
        hari_absensi = db.query(Absensi).filter(
            Absensi.id_pengguna == emp.id,
            Absensi.id_periode == id_periode
        ).count()
        
        jam_lembur = db.query(func.sum(Lembur.jam)).filter(
            Lembur.id_pengguna == emp.id,
            Lembur.id_periode == id_periode
        ).scalar() or 0
        
        total_reimbursement = db.query(func.sum(Reimbursement.jumlah)).filter(
            Reimbursement.id_pengguna == emp.id,
            Reimbursement.id_periode == id_periode
        ).scalar() or 0
        
        tarif_harian = (emp.gaji or 0) / 20
        bayaran_absensi = hari_absensi * tarif_harian
        bayaran_lembur = jam_lembur * (tarif_harian / 8 * 2)
        total_gaji_bersih = bayaran_absensi + bayaran_lembur + total_reimbursement
        total_gaji += total_gaji_bersih
        
        ringkasan.append({
            "id_karyawan": emp.id,
            "nama_pengguna": emp.nama_pengguna,
            "total_gaji_bersih": total_gaji_bersih
        })
    
    return {"karyawan": ringkasan, "total_gaji": total_gaji}

# Contoh Tes Unit
import pytest
from fastapi.testclient import TestClient

@pytest.fixture
def client():
    Base.metadata.create_all(bind=engine)
    buat_data_palsu(Session(engine))
    return TestClient(app)

def test_kirim_absensi_akhir_pekan(client):
    response = client.post("/absensi", json={"tanggal": "2025-06-28", "id_periode": 1})
    assert response.status_code == 400
    assert "akhir pekan" in response.json()["detail"]
