<a href="https://colab.research.google.com/github/acapodanno/inventory_management/blob/main/inventory_managment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""Repository git: https://github.com/acapodanno/inventory_management

Modularizzazione del Progetto

Ogni modulo ha un ruolo ben definito all’interno dell’architettura dell’applicazione.

Model: contiene la rappresentazione dei dati dell’applicazione.

Repository: è responsabile dell’accesso ai dati. Nel mio caso saranno operazioni su file csv

Service: contiene la logica di business dell’applicazione. Coordina i repository più varie logiche

Util: raccoglie metodi di utilità riutilizzabili e logiche indipendenti dal dominio dell’applicazione.

Constant: definizione di constanti per model nel mio caso ho definito gli status che mi servivano

GUI: definizione delle interfaccie utente"""

'Repository git: https://github.com/acapodanno/inventory_management\n\nModularizzazione del Progetto\n\nOgni modulo ha un ruolo ben definito all’interno dell’architettura dell’applicazione.\n\nModel: contiene la rappresentazione dei dati dell’applicazione.\n\nRepository: è responsabile dell’accesso ai dati. Nel mio caso saranno operazioni su file csv\n\nService: contiene la logica di business dell’applicazione. Coordina i repository più varie logiche\n\nUtil: raccoglie metodi di utilità riutilizzabili e logiche indipendenti dal dominio dell’applicazione.\n\nConstant: definizione di constanti per model nel mio caso ho definito gli status che mi servivano\n\nGUI: definizione delle interfaccie utente'

In [None]:
""" **Principi utilizzati per modularizzare**

La regola che ho utilizzato principalmente è ogni modulo/classe deve avere una sua resposabiltà (Single Responsability).
La dipendenza tra i moduli è questa:

*   GUI -> Service -> Repository -> Model"""

' **Principi utilizzati per modularizzare**\n\nLa regola che ho utilizzato principalmente è ogni modulo/classe deve avere una sua resposabiltà (Single Responsability).\nLa dipendenza tra i moduli è questa:\n\n*   GUI -> Service -> Repository -> Model'

In [None]:
"""**Model Layer**
Ho definito i seguenti model:


*   Product:

    Rappresenta il prodotto e ha i seguenti attributi:
    *   productCode
    *   name: nome descrittivo del prodotto
    *   category: categoria del prodotto
    *   initialStock: stock attuale
    *   reorderPoint: punto dello stock perpoter effettuare l'ordine a fornitore
    *   unitOfMeasure: unita di misura tipo pacchi(pack) etc...
    *   status:
        * Reorder: quando il prodotto/item é stato riordinato dal fornitore(supplier)
        * Active stato quando item è attivo
        * Inactive stato quando item non è attivo e non ordinabile
    *   maxStock: massimo approvigionamento possibile per evitare over stock

*   Order:

    Rappresenta ordine e ha i seguenti

    *  orderId
    *  orderDate: data ordino
    *  status:
        * Completed quando tutti gli item sono stati evasi
        * Partially completed quando solo alcuni item sono stati evasi o partialmente evasi
        * Pending: quando

    *  priority:1,....etc
    *  userId: corrisponde al customer id

*   OrderProducts(o OrderItem):

    Rappresenta gli item per ogni prodotto e ha i seguinti attributi

    * orderId
    * productCode
    * quantity_ordered: quantità ordinata
    * quantity_fulfilled: quantità disponibile evasa
    * status: gli stati sono
      * Fulfilled: completamente evaso
      * partially_fulfilled: parzialmente evaso
      * unfulfilled: non evaso

*   DailyReport:
    * numberOrders
    * numberOrderCompleted
    * numberOrderPending
    * productsMostFulfilled """

"**Model Layer**\nHo definito i seguenti model:\n\n\n*   Product:\n\n    Rappresenta il prodotto e ha i seguenti attributi:\n    *   productCode\n    *   name: nome descrittivo del prodotto\n    *   category: categoria del prodotto\n    *   initialStock: stock attuale\n    *   reorderPoint: punto dello stock perpoter effettuare l'ordine a fornitore\n    *   unitOfMeasure: unita di misura tipo pacchi(pack) etc...\n    *   status:\n        * Reorder: quando il prodotto/item é stato riordinato dal fornitore(supplier)\n        * Active stato quando item è attivo\n        * Inactive stato quando item non è attivo e non ordinabile\n    *   maxStock: massimo approvigionamento possibile per evitare over stock\n\n*   Order:\n\n    Rappresenta ordine e ha i seguenti\n\n    *  orderId\n    *  orderDate: data ordino\n    *  status:\n        * Completed quando tutti gli item sono stati evasi\n        * Partially completed quando solo alcuni item sono stati evasi o partialmente evasi\n        * Pend

In [None]:
class ProductModel:
    """Model representing a product."""
    def __init__(self,productCode,name,category,initialStock,reorderPoint,unitOfMeasure, status,maxStock):
        self.productCode = productCode
        self.name = name
        self.category = category
        self.initialStock = int(initialStock)
        self.reorderPoint = int(reorderPoint)
        self.maxStock = int(maxStock)
        self.unitOfMeasure = unitOfMeasure
        self.status = status

    def __str__(self):
        """ String representation for debugging."""
        return f"ProductModel({self.productCode}, {self.name}, {self.category}, {self.initialStock}, {self.reorderPoint}, {self.unitOfMeasure}), {self.status})"

    def __repr__(self):
        """ Representation method."""
        return self.__str__()

In [None]:
class OrderModel:
    """Model representing an order."""
    def __init__(self, orderId, orderDate, status,priority, userId):
        self.orderId = orderId
        self.orderDate = orderDate
        self.status = status
        self.priority = priority
        self.userId = userId

    def __str__(self):
        """ String representation for debugging."""
        return f"OrderModel({self.orderId}, {self.orderDate}, {self.status}, {self.priority}, {self.userId})"

    def __repr__(self):
        """ Representation method."""
        return self.__str__()

In [None]:
class OrderProductModel:
    """Model representing a product line within an order."""
    def __init__(self, orderId, productCode, quantityOrdered, quantityFulfilled, status):
        self.orderId = orderId
        self.productCode = productCode
        self.quantityOrdered = quantityOrdered
        self.quantityFulfilled = quantityFulfilled
        self.status = status

    def __str__(self):
        """ String representation for debugging."""
        return f"OrderProductModel({self.orderId}, {self.productCode}, {self.quantityOrdered}, {self.quantityFulfilled}, {self.status})"

    def __repr__(self):
        """ Representation method."""
        return self.__str__()

In [None]:
class DailyReportModel:
    """Model representing a daily report."""
    def __init__(self, numberOrders,numberOrderCompleted,numberOrderPending,productsMostFulfilled):
        self.numberOrders = numberOrders
        self.numberOrderCompleted = numberOrderCompleted
        self.numberOrderPending = numberOrderPending
        self.productsMostFulfilled = productsMostFulfilled



**Status dei vari moduli**
I status per permettere un maggiore utilizzi in tutte le parti del codice sono stati definiti all'interno del modulo constant:

In [None]:
""" Status dei vari moduli I status per permettere un maggiore utilizzi in tutte le parti del codice sono stati definiti all'interno del modulo constant"""

" Status dei vari moduli I status per permettere un maggiore utilizzi in tutte le parti del codice sono stati definiti all'interno del modulo constant"

In [None]:
class ProductStatus:
    ACTIVE = "ACTIVE"
    INACTIVE = "INACTIVE"
    REORDERING = "REORDERING"

In [None]:
class OrderStatus:
    PENDING = "PENDING"
    PARTIALLY_COMPLETED = "PARTIALLY_COMPLETED"
    COMPLETED = "COMPLETED"

In [None]:
class OrderProductStatus:
    FULFILLED = "FULFILLED"
    UNFULFILLED = "UNFULFILLED"
    PARTIALLY_FULFILLED = "PARTIALLY_FULFILLED"

In [None]:
""" Util Layer

Sono state creare i segunti metodi per eseguire comportamenti mirati:

loggin_config: serve a far storare tutti i log allinterno di un file csv

validate_date: validazione della data nel formatto YYYY-MM-dd

generate_excel: responsabile della generazione del report giornaliero richiede istalliazione della libreai xlwt

csv_managemenet: sono presenti due metodi per gestire la presenza della file csv e della directory nel caso non esista crea."""

' Util Layer\n\nSono state creare i segunti metodi per eseguire comportamenti mirati:\n\nloggin_config: serve a far storare tutti i log allinterno di un file csv\n\nvalidate_date: validazione della data nel formatto YYYY-MM-dd\n\ngenerate_excel: responsabile della generazione del report giornaliero richiede istalliazione della libreai xlwt\n\ncsv_managemenet: sono presenti due metodi per gestire la presenza della file csv e della directory nel caso non esista crea.'

In [None]:
import os
import csv

def ensure_csv_file_exists(file_path, headers):
    """ Ensure that the specified CSV file exists with the given headers. """
    if not os.path.exists(file_path):
        with open(file_path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(headers)

def ensure_directory_exists(directory):
    """ Ensure that the specified directory exists. """
    if not os.path.exists(directory):
        os.makedirs(directory)

In [None]:
import logging

def setup_logging(log_file="dummy_data/logs.csv"):
    """ Set up logging configuration. """
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s,%(levelname)s,%(name)s,%(message)s",
        handlers=[logging.FileHandler(log_file, encoding="utf-8")],
        datefmt="%Y-%m-%d %H:%M:%S")


In [None]:
"""pip install xlwt"""

'pip install xlwt'

In [None]:
import xlwt


def generate_excel(data, file_path):
    """ Generate an Excel file from the provided data dictionary. """
    workbook = xlwt.Workbook()
    sheet_daily_report = workbook.add_sheet("daily_report")
    sheet_product_most_fulfilled = workbook.add_sheet("product_most_fulfilled")

    headers_daily = {0: "numberOrders", 1: "numberOrderCompleted", 2: "numberOrderPending"}
    for col, header in enumerate(headers_daily):
        sheet_daily_report.write(0, col, headers_daily[header])

    headers_product_most_fulfilled = {0: "productCode", 1: "quantityFulfilled"}
    for col, header in enumerate(headers_product_most_fulfilled):
        sheet_product_most_fulfilled.write(0, col, headers_product_most_fulfilled[header])
    for row_idx, row_data in enumerate(data["productsMostFulfilled"], start=1):
        for col_idx, cell_data in enumerate(row_data):
            sheet_product_most_fulfilled.write(row_idx, col_idx, cell_data)
    for col_idx, row_data in enumerate(data["dailyReport"]):
        sheet_daily_report.write(1, col_idx, row_data)

    workbook.save(file_path)

ModuleNotFoundError: No module named 'xlwt'

In [None]:
"""Repository Layer

è stato definito un repository per ogni model tranne per Daily Report. Ogni repository prende input il path di dove è stato definito il csv. Ho definito i seguenti Repositoty:

ProductRepository
OrderRepository
OrderProductRepository
"""


In [None]:
class ProductRepository:
    """Repository layer for managing products in CSV storage."""
    def __init__(self, path):
        self.path = path

    def find_all(self):
        """ Retrieve all products."""
        products = {}
        with open(self.path, 'r') as file:
            next(file)
            for line in file:
                parts = line.strip().split(',')
                if len(parts) == 8:
                    productCode, name, category, initialStock, reorderPoint, unitOfMeasure, status, maxStock = parts
                    products[productCode] = ProductModel(
                        productCode,
                        name,
                        category,
                        int(initialStock),
                        int(reorderPoint),
                        unitOfMeasure,
                        status,
                        int(maxStock)
                    )
        return products

    def find_by_filter(self, **filters):
        """ Retrieve products by filter criteria."""
        all_products = self.find_all()
        filtered_products = {code: product for code, product in all_products.items() if self._match_filter(product, **filters)}
        return filtered_products

    def _match_filter(self, product, **filters):
        """ Helper method to match product against filters."""
        for key, value in filters.items():
            if not hasattr(product, key) or getattr(product, key) != value:
                return False
        return True

    def save(self, p):
        """ Add a new product."""
        with open(self.path, 'a') as file:
            line = f"{p.productCode},{p.name},{p.category},{p.initialStock},{p.reorderPoint},{p.unitOfMeasure},{p.status},{p.maxStock}\n"
            file.write(line)

    def update(self, product):
        """ Update an existing product."""
        products = self.find_all()
        products[product.productCode] = product
        with open(self.path, 'w') as file:
            file.write("productCode,name,category,initialStock,reorderPoint,unitOfMeasure,status,maxStock\n")
            for p in products.values():
                line = f"{p.productCode},{p.name},{p.category},{p.initialStock},{p.reorderPoint},{p.unitOfMeasure},{p.status},{p.maxStock}\n"
                file.write(line)

In [None]:
class OrderRepository:
    """Repository layer for managing orders in CSV storage."""
    def __init__(self, path):
        self.path = path

    def find_all(self):
        """ Retrieve all orders."""
        orders = {}
        with open(self.path, 'r') as file:
            next(file)
            for line in file:
                parts = line.strip().split(',')
                if len(parts) == 5:
                    orderId, orderDate, status, priority, userId = parts
                    orders[orderId] = OrderModel(
                        orderId,
                        orderDate,
                        status,
                        priority,
                        userId
                    )
        return orders

    def save(self, order):
        """ Save a new order."""
        with open(self.path, 'a') as file:
            line = f"{order.orderId},{order.orderDate},{order.status},{order.priority},{order.userId}\n"
            file.write(line)

In [None]:
class OrderProductRepository:
    """Repository layer for managing order products in CSV storage."""
    def __init__(self, path):
        self.path = path
    def find_all(self):
        """ Retrieve all order products."""
        order_products = {}
        with open(self.path, 'r') as file:
            next(file)
            for line in file:
                parts = line.strip().split(',')
                if len(parts) == 5:
                    orderId, productCode, quantityOrdered, quantityFulfilled, status = parts
                    order_products_key = (orderId, productCode)
                    order_products[order_products_key] = OrderProductModel(
                        orderId,
                        productCode,
                        quantityOrdered,
                        quantityFulfilled,
                        status
                    )
        return order_products

    def save(self, order_product):
        """ Save a new order product."""
        with open(self.path, 'a') as file:
            file.write("orderId,productCode,quantityOrdered,quantityFulfilled,status\n")
            line = f"{order_product.orderId},{order_product.productCode},{order_product.quantityOrdered},{order_product.quantityFulfilled},{order_product.status}\n"
            file.write(line)

    def update(self, order_product):
        """ Update an existing order product."""
        order_products = self.find_all()
        order_products_key = (order_product.orderId, order_product.productCode)
        order_products[order_products_key] = order_product
        with open(self.path, 'w') as file:
            file.write("orderId,productCode,quantityOrdered,quantityFulfilled,status\n")
            for op in order_products.values():
                line = f"{op.orderId},{op.productCode},{op.quantityOrdered},{op.quantityFulfilled},{op.status}\n"
                file.write(line)

In [None]:
""" **Service Layer**

I componenti dei service sono i seguenti:
 * product_service: si occupano
    * Gestione di tutte le operazioni crud del repository
    * Ricerca per filtro
 * order_service:
    * ottenere tuttti gli ordini
    * aggiunta di un nuovo ordine mediante repository solo salvataggio del entita sul csv
 * order_product_service:
    * ottenere tutti i order_product per un dato ordine
    * varie operazioni di crud del repositoryù
 * send_order:
    * logica per la creazione del ordine
      * vengono fatte per ogni item del ordine vengono fatti i seguenti:
          * se il prodotto non viene trovato lancia eccezione all utente
          * se il prodotto non è attivo lancia eccezione feed back all utente
          * se la quantita ordinata è minore o ugale di quella presente nello stock del prodotto l' order_product ha status fulfilled
          * se la quantita ordinata è maggiore di quella ordinata di quella presente allo stock del prodotto allora l'order_product ha status partially fulfilled
          * se lo stock del product è 0 allora order_product ha status unfulfilled
          * decremento la quantity dallo stock del product
          * se il prodotto ha lo stock minore o uguale al punto di riordino e non è in stato di riordino viene fatto riordino al fornitore e cambia  lo stato product a Reordering altrimenti non fa nulla(potrebbe anche essere migliorato questo punto)
       * determinare lo stato del ordine:
          * se tutti gli item sono fulfilled allora ordine è completo
          * se tutti gli stati sono unfulfilled allora ordine è pending
          * altrimenti lo stato del ordine è partially_completed
       * crea ordine sul csv con gli order item

  * report_service:
    * genera i dati per visualizzare i dati del report del giorno corrente
    * somma di tutti gli ordini del giorno
    * somma di tutti gli oridini pending
    * somma di tutti gli oridini completed
    * prendi i 5 order_product con maggiore quantita evasa (quantity_fulfilled)

"""

In [None]:
import logging
class OrderService:
    """Service layer for managing orders."""
    def __init__(self, order_repository):
        self.order_repository = order_repository
        self.logger = logging.getLogger(__name__)
    """CRUD operations for orders."""

    def get_all_orders(self):
        """ Retrieve all orders."""
        self.logger.info("Fetching all orders")
        return self.order_repository.find_all()

    def add_order(self, order):
        """ Create a new order."""
        self.logger.info(f"Adding new order: {order}")
        return self.order_repository.save(order)


In [None]:
class OrderProductService:
    """Service layer for managing order products."""
    def __init__(self, order_product_repository):
        self.order_product_repository = order_product_repository
    """CRUD operations for order products."""
    def get_all_order_products(self):
        """ Retrieve all order products."""
        return self.order_product_repository.find_all()

    def add_order_product(self, order_product):
        """ Add a new order product."""
        return self.order_product_repository.save(order_product)

    def get_order_products_by_order_id(self, orderId):
        """ Retrieve order products by order ID."""
        all_order_products = self.order_product_repository.find_all()
        filtered_order_products = {
            key: op for key, op in all_order_products.items() if op.orderId == orderId
        }
        return filtered_order_products

    def save_order_product(self, order_product):
        """ Save an order product by ID."""
        return self.order_product_repository.save(order_product)

    def update_order_product(self, order_product):
        """ Update an existing order product."""
        return self.order_product_repository.update(order_product)

In [None]:
import logging
class ProductService:
    """Service layer for managing products."""
    def __init__(self, repository):
        self.repository = repository
        self.logger = logging.getLogger(__name__)

    """CRUD operations for products."""

    def get_all_products(self):
        """ Retrieve all products."""
        self.logger.info("Fetching all products")
        return self.repository.find_all()

    def get_products_by_filter(self, **filters):
        """ Retrieve products by filter criteria."""
        self.logger.info(f"Fetching products with filters: {filters}")
        return self.repository.find_by_filter(**filters)

    def add_product(self, product):
        """ Add a new product."""
        self.logger.info(f"Adding new product: {product}")
        return self.repository.save(product)

    def update_product(self, product):
        """ Update an existing product."""
        self.logger.info(f"Updating product: {product}")
        return self.repository.update(product)

In [None]:
from datetime import datetime
import logging
class ReportService:
    """Service layer for generating reports."""
    def __init__(self, order_service, product_service,order_product_service):
        self.order_service = order_service
        self.product_service = product_service
        self.order_product_service = order_product_service
        self.logger = logging.getLogger(__name__)

    def generated_daily_report(self):
        """ Generate daily report."""
        self.logger.info("Generating daily report.")
        orders = self.order_service.get_all_orders()
        products = self.product_service.get_all_products()
        order_products = self.order_product_service.get_all_order_products()
        today = datetime.now().date()
        report = DailyReportModel(0,0,0,[])
        daily_orders = [order for order in orders.values() if datetime.strptime(order.orderDate, '%Y-%m-%d').date() == today]
        report.numberOrders = len(daily_orders)
        report.numberOrderCompleted = self._sum_order_for_status(daily_orders, OrderStatus.COMPLETED)
        report.numberOrderPending = self._sum_order_for_status(daily_orders, OrderStatus.PENDING)
        product_fulfillment = {}
        daily_order_ids = {order.orderId for order in daily_orders}
        order_products_result = [order_product for order_product in order_products.values() if order_product.orderId in daily_order_ids ]
        for op in order_products_result:
            if op.productCode not in product_fulfillment:
                product_fulfillment[op.productCode] = 0
            product_fulfillment[op.productCode] += int(op.quantityFulfilled)
        most_fulfilled_products = sorted(product_fulfillment.items())[:5]
        report.productsMostFulfilled = [(products[code].name, qty) for code, qty in most_fulfilled_products if code in products]
        return report

    def _sum_order_for_status(self, orders, status):
        """ Helper method to sum orders by status."""
        return sum(1 for order in orders if order.status == status)

In [None]:
import logging
class SendOrder:
    """Service layer for sending orders and processing product lines."""
    def __init__(self, order_service: OrderService, order_product_service: OrderProductService,product_service: ProductService):
        self.order_service = order_service
        self.order_product_service = order_product_service
        self.product_service = product_service
        self.logger = logging.getLogger(__name__)

    def send_order(self, order, product_lines):
        """ Process and send an order with its product lines."""
        self.logger.info(f"Processing order {order.orderId} with product lines: {product_lines}")
        order_products = []
        for line in product_lines:
            productCode = line['productCode']
            quantity = line['quantity']
            order_product = self._process_product_line(productCode, quantity,order.orderId)
            order_products.append(order_product)
        order.status = self._determinate_order_status_from_product_lines(order_products)
        self.order_service.add_order(order)

    def _determinate_order_status_from_product_lines(self, order_products):
        """ Determine overall order status based on product line statuses."""
        all_fulfilled = all(op.status == OrderProductStatus.FULFILLED for op in order_products)
        all_unfulfilled = all(op.status == OrderProductStatus.UNFULFILLED for op in order_products)
        if all_fulfilled:
            return OrderStatus.COMPLETED
        elif all_unfulfilled:
            return OrderStatus.PENDING
        else:
            return OrderStatus.PARTIALLY_COMPLETED

    def _process_product_line(self, productCode, quantity, orderId):
        """ Process an individual product line within an order."""
        products = self.product_service.get_all_products()
        product = products.get(productCode)
        if not product:
            self.logger.warning(f"Product {productCode} not found.")
            raise Exception(f"Product not found.")
        if ProductStatus.INACTIVE == product.status:
            self.logger.warning(f"Product {productCode} is inactive.")
            raise Exception(f"Product is inactive.")

        available = product.initialStock
        fulfilled = min(available, quantity)

        if fulfilled == 0:
            status = OrderProductStatus.UNFULFILLED
        elif fulfilled < quantity:
            status = OrderProductStatus.PARTIALLY_FULFILLED
        else:
            status = OrderProductStatus.FULFILLED

        order_product = OrderProductModel(
            orderId=orderId,
            productCode=productCode,
            quantityOrdered=quantity,
            quantityFulfilled=fulfilled,
            status=status
        )
        self.order_product_service.add_order_product(order_product)
        if fulfilled > 0:
            product.initialStock -= fulfilled
        if product.initialStock <= product.reorderPoint and product.status != ProductStatus.REORDERING:
            restock_quantity = product.maxStock - product.initialStock
            self.product_service.update_product(product)
            self._reorder_stock_to_supplier(productCode, restock_quantity,product)
            return order_product

        self.product_service.update_product(product)
        return order_product

    def _reorder_stock_to_supplier(self, productCode, quantity,product):
        """" Reorder stock for a product from the supplier. simulated action."""
        self.logger.info(f"Reordering {quantity} of product {productCode} from supplier.")
        product.status=ProductStatus.REORDERING
        self.product_service.update_product(product)
        return True

In [None]:
""" **Testing**
Prima di procedere alla ui ho fatto testing dei vari componenti partendo dal layer piu alto service."""

In [None]:
def _intiialize_cvs_files():
    ensure_directory_exists('dummy_data')
    csv_headers = {
        'dummy_data/products.csv': ['productCode', 'name', 'category', 'initialStock', 'reorderPoint', 'unitOfMeasure', 'status', 'maxStock'],
        'dummy_data/orders.csv': ['orderId', 'orderDate', 'status', 'priority', 'userId'],
        'dummy_data/order_products.csv': ['orderId', 'productCode', 'quantity_ordered', 'quantity_fulfilled', 'status'],
        'dummy_data/logs.csv': ['timestamp', 'level', 'name', 'message']
    }
    for file_path, headers in csv_headers.items():
        ensure_csv_file_exists(file_path, headers)

In [None]:
_intiialize_cvs_files()

In [None]:
setup_logging()
productService = ProductService(ProductRepository('dummy_data/products.csv'))
data = [
    ("P001","Mechanical Keyboard","Hardware",0,30,"pcs","REORDERING",100),
    ("P002","Wireless Mouse","Hardware",84,25,"pcs","ACTIVE",150),
    ("P003","USB-C Cable 1m","Accessories",300,60,"pcs","ACTIVE",500),
    ("P004","USB-C Cable 2m","Accessories",180,50,"pcs","ACTIVE",350),
    ("P005","27inch Monitor","Hardware",40,10,"pcs","ACTIVE",80),
    ("P006","Laptop Stand Aluminum","Accessories",75,20,"pcs","ACTIVE",100),
    ("P007","Notebook A4","Stationery",250,50,"pcs","ACTIVE",300),
    ("P008","Blue Ballpoint Pen","Stationery",500,100,"pcs","ACTIVE",500),
    ("P009","Office Chair","Office",15,5,"pcs","ACTIVE",30),
    ("P010","Desk Lamp LED","Office",28,10,"pcs","ACTIVE",30),
    ("P011","External SSD 1TB","Hardware",22,10,"pcs","ACTIVE",30),
    ("P012","HDMI Cable","Accessories",25,30,"pcs","INACTIVE",30),
    ("P013","Printer Paper A4","Stationery",400,80,"pcs","ACTIVE",400),
    ("P014","Webcam HD","Hardware",18,10,"pcs","ACTIVE",20),
    ("P015","Headphones Wireless","Hardware",55,20,"pcs","ACTIVE",60),
    ("P016","Smartphone Stand","Accessories",95,25,"pcs","ACTIVE",100),
    ("P017","Whiteboard Marker","Stationery",60,40,"pcs","ACTIVE",80),
    ("P018","Router WiFi","Hardware",12,8,"pcs","ACTIVE",15),
    ("P019","Power Strip 6 sockets","Office",33,15,"pcs","ACTIVE",40),
    ("P020","Desk Organizer","Office",70,20,"pcs","ACTIVE",100),
]

products = [ProductModel(*row) for row in data]

for product in products:
    productService.add_product(product)
print(f"{productService.get_all_products()}\n")
print(productService.get_products_by_filter(category='Hardware'))

import uuid
orderId=uuid.uuid4().__str__()
orderService = OrderService(OrderRepository('dummy_data/orders.csv'))
orderService.add_order(OrderModel(orderId,"2026-01-24",OrderStatus.COMPLETED,1,"1234"))
print(f"{orderService.get_all_orders()}")



In [None]:
order_products_data = {
    "1": OrderProductModel(orderId, "P001", 10, 0, OrderProductStatus.UNFULFILLED),
    "2": OrderProductModel(orderId, "P002", 5, 2, OrderProductStatus.PARTIALLY_FULFILLED),
    "3": OrderProductModel(orderId, "P003", 7, 7, OrderProductStatus.FULFILLED),
}

orderProductService = OrderProductService(OrderProductRepository('dummy_data/order_products.csv'))
orderProductService.add_order_product(OrderProductModel(orderId, "P010", 3, 0, OrderProductStatus.UNFULFILLED))
print(orderProductService.get_all_order_products())
print(orderProductService.get_order_products_by_order_id(orderId))
orderProductService.save_order_product(OrderProductModel(orderId, "P020", 12, 0, OrderProductStatus.UNFULFILLED))
orderProductService.update_order_product(OrderProductModel(orderId, "P002", 5, 5, OrderProductStatus.FULFILLED))
print(orderProductService.get_all_order_products())



In [None]:
reportService = ReportService(orderService, productService, orderProductService)
daily_report=reportService.generated_daily_report()
generate_excel({
        "dailyReport": [daily_report.numberOrders,
                        daily_report.numberOrderCompleted,
                        daily_report.numberOrderPending],
        "productsMostFulfilled": daily_report.productsMostFulfilled
    }, "daily_report.xls")

In [None]:
""" GUI Le ui è stata fatta con questa libreria PySide6 La ui è stata splittata in questa maniera:

product_ui presente un filtro di rirca e la relativa tabella per visualizzare gli item
order_ui: ce una sezione principale per creare ordine aggiunta del form con input:
CustomerId
data formato yyyy-mm-dd
priority
tabella per aggiungere/rimuovere gli order item
Visualizza tabella degli order item e ogni click di riga vengo visualizzati gli order item
report_ui
main_windows: la ui si occupa principale di leggare tutte e tre le ui con un tab in modo da facilitare lo switch
shared: componenti condivisi tipo la custom popup"""

In [None]:
""" pip install PySide6 """

In [None]:
class CustomPopupLevel:
    """ Levels for custom popup messages. """
    WARNING="Warning"
    ERROR="Error"
    INFO="Info"

In [None]:
from PySide6.QtWidgets import (
    QDialog,QDialogButtonBox,QVBoxLayout,QLabel
)
class CustomPopup(QDialog):
    """ Custom popup dialog for displaying messages. """
    def __init__(self,title,label,parent=None):
        super().__init__(parent)
        self.setWindowTitle(title)
        layout = QVBoxLayout(self)
        layout.addWidget(QLabel(label))
        btn_pk= QDialogButtonBox(QDialogButtonBox.Ok)
        btn_pk.accepted.connect(self.accept)
        layout.addWidget(btn_pk)


In [None]:
from PySide6.QtWidgets import (
    QWidget, QVBoxLayout, QHBoxLayout, QFormLayout,
    QLineEdit, QSpinBox, QPushButton, QLabel,
    QTableWidget, QTableWidgetItem, QGroupBox
)

import uuid
from datetime import datetime
class OrderUI(QWidget):
    """UI component for managing orders."""
    def __init__(self, send_order, order_service, order_product_service, parent=None):
        super().__init__(parent)
        self.send_order = send_order
        self.order_service = order_service
        self.order_product_service = order_product_service

        root = QVBoxLayout(self)

        form_box = QGroupBox("Create Order")
        form_layout = QVBoxLayout(form_box)

        fields = QFormLayout()
        form_layout.addLayout(fields)

        self.customer_input = QLineEdit()
        self.customer_input.setPlaceholderText("e.g. C001")
        fields.addRow("Customer ID:", self.customer_input)

        self.date_input = QLineEdit()
        self.date_input.setText(datetime.today().strftime("%Y-%m-%d"))
        self.date_input.setPlaceholderText("YYYY-MM-DD")
        fields.addRow("Order Date:", self.date_input)

        self.priority_input = QSpinBox()
        self.priority_input.setRange(1, 10)
        self.priority_input.setValue(1)
        fields.addRow("Priority:", self.priority_input)

        self.lines_table = QTableWidget(0, 2)
        self.lines_table.setHorizontalHeaderLabels(["productCode", "quantity"])
        form_layout.addWidget(QLabel("Order Items:"))
        form_layout.addWidget(self.lines_table)

        btns = QHBoxLayout()
        form_layout.addLayout(btns)

        self.add_line_btn = QPushButton("Add line")
        self.remove_line_btn = QPushButton("Remove selected line")
        self.create_order_btn = QPushButton("Send order")
        btns.addWidget(self.add_line_btn)
        btns.addWidget(self.remove_line_btn)
        btns.addStretch(1)
        btns.addWidget(self.create_order_btn)

        root.addWidget(form_box)

        orders_box = QGroupBox("Orders")
        orders_layout = QVBoxLayout(orders_box)

        self.refresh_orders_btn = QPushButton("Refresh orders")
        orders_layout.addWidget(self.refresh_orders_btn)

        self.orders_table = QTableWidget(0, 5)
        self.orders_table.setHorizontalHeaderLabels(["orderId", "customerId", "orderDate", "priority", "status"])
        self.orders_table.setSortingEnabled(True)
        orders_layout.addWidget(self.orders_table)

        root.addWidget(orders_box)

        items_box = QGroupBox("Order Items (selected order)")
        items_layout = QVBoxLayout(items_box)

        self.items_table = QTableWidget(0, 5)
        self.items_table.setHorizontalHeaderLabels(["orderId", "productCode", "qtyOrdered", "qtyFulfilled", "status"])
        self.items_table.setSortingEnabled(True)
        items_layout.addWidget(self.items_table)

        root.addWidget(items_box)

        self.add_line_btn.clicked.connect(self._add_empty_line)
        self.remove_line_btn.clicked.connect(self.remove_selected_line)
        self.create_order_btn.clicked.connect(self._on_send_order)

        self.refresh_orders_btn.clicked.connect(self.reload_orders)
        self.orders_table.itemSelectionChanged.connect(self._on_order_selected)

        self.reload_orders()

    def _add_empty_line(self):
        """ Add an empty line to the order items table."""
        r = self.lines_table.rowCount()
        self.lines_table.insertRow(r)
        self.lines_table.setItem(r, 0, QTableWidgetItem(""))  # productCode
        self.lines_table.setItem(r, 1, QTableWidgetItem("1"))  # quantity

    def remove_selected_line(self):
        """ Remove the selected line from the order items table."""
        r = self.lines_table.currentRow()
        if r >= 0:
            self.lines_table.removeRow(r)

    def _read_lines(self):
        """ Read product lines from the order items table."""
        lines = []
        for r in range(self.lines_table.rowCount()):
            code_item = self.lines_table.item(r, 0)
            qty_item = self.lines_table.item(r, 1)

            code = (code_item.text().strip() if code_item else "")
            qty_raw = (qty_item.text().strip() if qty_item else "")

            if not code:
                continue

            try:
                qty = int(qty_raw)
            except ValueError:
                qty = 0

            if qty <= 0:
                continue

            lines.append({"productCode": code, "quantity": qty})

        return lines

    def _on_send_order(self):
        """ Handle the send order action."""
        customer_id = self.customer_input.text().strip()
        order_date = self.date_input.text().strip()
        priority = int(self.priority_input.value())
        product_lines = self._read_lines()

        if not customer_id:
            dlg = CustomPopup(CustomPopupLevel.WARNING,"Customer ID is not must empty!")
            dlg.exec()
            return
        elif not order_date and validate_date(order_date):
            dlg = CustomPopup(CustomPopupLevel.WARNING,"Order Date is not must empty!")
            dlg.exec()
            return
        elif not product_lines:
            dlg = CustomPopup(CustomPopupLevel.WARNING,"Product Line is not must empty!")
            dlg.exec()
            return

        order = OrderModel(
            orderId=uuid.uuid4().__str__(),
            userId=customer_id,
            orderDate=order_date,
            priority=priority,
            status=None
        )
        try:
            self.send_order.send_order(order, product_lines)
            self.reload_orders()
            print(f"Send Order Success:{order}")
            dlg = CustomPopup(CustomPopupLevel.INFO,f"Success Send Order with id: {order.orderId}")
            dlg.exec()
        except Exception as e:
            print(f"Error sending order: {str(e)}")
            dlg = CustomPopup(CustomPopupLevel.ERROR,f"Failed to send order: {str(e)}")
            dlg.exec()


    def reload_orders(self):
        """ Reload and display all orders in the orders table."""
        self.date_input.setText("")
        self.customer_input.setText("")
        data = self.order_service.get_all_orders()
        orders = data.values()
        self.orders_table.setRowCount(len(orders))
        self.orders_table.setSortingEnabled(False)
        self.orders_table.clearContents()
        for r, o in enumerate(orders):
            self.orders_table.setItem(r, 0, QTableWidgetItem(str(o.orderId)))
            self.orders_table.setItem(r, 1, QTableWidgetItem(str(getattr(o, "userId", ""))))
            self.orders_table.setItem(r, 2, QTableWidgetItem(str(getattr(o, "orderDate", ""))))
            self.orders_table.setItem(r, 3, QTableWidgetItem(str(getattr(o, "priority", ""))))
            self.orders_table.setItem(r, 4, QTableWidgetItem(str(getattr(o, "status", ""))))
        self.orders_table.resizeColumnsToContents()
        self.orders_table.resizeRowsToContents()
        self.orders_table.viewport().update()
        self.orders_table.repaint()
        self.items_table.setRowCount(0)

    def _on_order_selected(self):
        """ Handle order selection to display its items."""
        selected = self.orders_table.selectedItems()
        if not selected:
            return

        order_id = self.orders_table.item(self.orders_table.currentRow(), 0).text().strip()
        if not order_id:
            return

        items = self.order_product_service.get_order_products_by_order_id(order_id)
        self._render_items(items.values())

    def _render_items(self, items):
        """ Render order items in the items table."""
        self.items_table.setRowCount(len(items))
        self.items_table.setSortingEnabled(False)
        self.items_table.clearContents()
        for r, it in enumerate(items):
            self.items_table.setItem(r, 0, QTableWidgetItem(getattr(it, "orderId", "")))
            self.items_table.setItem(r, 1, QTableWidgetItem(getattr(it, "productCode", "")))
            self.items_table.setItem(r, 2, QTableWidgetItem(str(getattr(it, "quantityOrdered", ""))))
            self.items_table.setItem(r, 3, QTableWidgetItem(str(getattr(it, "quantityFulfilled", ""))))
            self.items_table.setItem(r, 4, QTableWidgetItem(getattr(it, "status", "")))
        self.items_table.resizeColumnsToContents()
        self.items_table.resizeRowsToContents()
        self.items_table.viewport().update()
        self.items_table.repaint()

In [None]:
from PySide6.QtWidgets import (
    QWidget, QVBoxLayout, QHBoxLayout,
    QLineEdit, QComboBox, QPushButton, QLabel,
    QTableWidget, QTableWidgetItem
)
class ProductUI(QWidget):
    """UI component for displaying and filtering products."""
    def __init__(self, product_service, parent=None):
        super().__init__(parent)
        self.product_service = product_service

        root = QVBoxLayout(self)
        filters = QHBoxLayout()
        root.addLayout(filters)

        self.code_input = QLineEdit()
        self.code_input.setPlaceholderText("e.g. P001")
        filters.addWidget(QLabel("Product code:"))
        filters.addWidget(self.code_input)

        self.category_combo = QComboBox()
        self.category_combo.addItems(["All"])
        filters.addWidget(QLabel("Category:"))
        filters.addWidget(self.category_combo)

        self.stock_threshold = QLineEdit()
        self.stock_threshold.setPlaceholderText("e.g. 50")
        filters.addWidget(QLabel("Stock ≤"))
        filters.addWidget(self.stock_threshold)

        self.apply_btn = QPushButton("Apply")
        self.refresh_btn = QPushButton("Refresh")
        filters.addWidget(self.apply_btn)
        filters.addWidget(self.refresh_btn)
        self.table = QTableWidget(0, 7)
        self.table.setHorizontalHeaderLabels([
            "product_code", "name", "category",
            "initial_stock", "reorder_point",
            "unit_of_measure", "status"
        ])
        self.table.setSortingEnabled(True)
        root.addWidget(self.table)

        self.apply_btn.clicked.connect(self.apply_filters)
        self.refresh_btn.clicked.connect(self.reload_data)

        self.products = {}
        self.reload_data()

    def reload_data(self):
        """ Reload product data from the service and update the UI."""
        self.products = self.product_service.get_all_products()
        self._reload_categories()
        self.render_table(self.products.values())
    def _reload_categories(self):
        """ Reload product categories for the filter dropdown."""
        cats = sorted({p.category for p in self.products.values()})
        self.category_combo.blockSignals(True)
        self.category_combo.clear()
        self.category_combo.addItem("All")
        for c in cats:
            self.category_combo.addItem(c)
        self.category_combo.blockSignals(False)
    def apply_filters(self):
        """ Apply filters based on user input and update the table."""
        code = self.code_input.text().strip()
        cat = self.category_combo.currentText()
        thr_raw = self.stock_threshold.text().strip()

        thr = None
        if thr_raw:
            try:
                thr = int(thr_raw)
            except ValueError:
                thr = None

        filtered = []
        for p in self.products.values():
            if code and p.productCode != code:
                continue
            if cat != "All" and p.category != cat:
                continue
            if thr is not None and int(p.initialStock) > thr:
                continue
            filtered.append(p)
        if len(filtered) == 0:
            dlg = CustomPopup(CustomPopupLevel.INFO,"No products found!")
            dlg.exec()

        self.render_table(filtered)

    def render_table(self, rows):
        """ Render the given product rows in the table."""
        self.table.setRowCount(len(rows))
        self.table.setSortingEnabled(False)
        self.table.clearContents()
        for r, p in enumerate(rows):
            self.table.setItem(r, 0, QTableWidgetItem(getattr(p, "productCode", "")))
            self.table.setItem(r, 1, QTableWidgetItem(getattr(p, "name", "")))
            self.table.setItem(r, 2, QTableWidgetItem(getattr(p, "category", "")))
            self.table.setItem(r, 3, QTableWidgetItem(str(getattr(p, "initialStock", ""))))
            self.table.setItem(r, 4, QTableWidgetItem(str(getattr(p, "reorderPoint", ""))))
            self.table.setItem(r, 5, QTableWidgetItem(getattr(p, "unitOfMeasure", "")))
            self.table.setItem(r, 6, QTableWidgetItem(getattr(p, "status", "")))
        self.table.resizeColumnsToContents()
        self.table.resizeRowsToContents()
        self.table.viewport().update()
        self.table.repaint()

In [None]:
from PySide6.QtWidgets import (
    QWidget, QVBoxLayout, QHBoxLayout,
    QLabel, QPushButton, QTableWidget, QTableWidgetItem, QGroupBox
)
from PySide6.QtCore import Qt
class ReportUI(QWidget):
    """UI component for generating and displaying daily reports."""
    _header_title = "Daily Report"
    _btn_generate_excel_title = "Generate Excel Report"
    _btn_generate_report_title = "Generate today report"
    def __init__(self, report_service, parent=None):
        super().__init__(parent)
        self.report_service = report_service

        root = QVBoxLayout(self)
        header = QHBoxLayout()
        self.title = QLabel(self._header_title)
        header.addWidget(self.title)

        header.addStretch(1)
        self.generate_excel_btn = QPushButton(self._btn_generate_excel_title)
        self.generate_btn = QPushButton(self._btn_generate_report_title)
        header.addWidget(self.generate_btn)
        header.addWidget(self.generate_excel_btn)
        root.addLayout(header)

        kpi_box = QGroupBox("Summary")
        kpi_layout = QHBoxLayout(kpi_box)

        self.total_orders_lbl = QLabel("Total orders: 0")
        self.completed_orders_lbl = QLabel("Completed: 0")
        self.pending_orders_lbl = QLabel("Pending: 0")

        for lbl in (self.total_orders_lbl, self.completed_orders_lbl, self.pending_orders_lbl):
            lbl.setAlignment(Qt.AlignCenter)

        kpi_layout.addWidget(self.total_orders_lbl)
        kpi_layout.addWidget(self.completed_orders_lbl)
        kpi_layout.addWidget(self.pending_orders_lbl)

        root.addWidget(kpi_box)

        table_box = QGroupBox("Most Fulfilled Products")
        table_layout = QVBoxLayout(table_box)

        self.products_table = QTableWidget(0, 2)
        self.products_table.setHorizontalHeaderLabels(["productCode", "quantityFulfilled"])
        self.products_table.setSortingEnabled(True)

        table_layout.addWidget(self.products_table)
        root.addWidget(table_box)


        self.generate_btn.clicked.connect(self._generate_report)
        self.generate_excel_btn.clicked.connect(self._generate_excel_report)
    def _generate_excel_report(self):
        """ Generate and save the daily report as an Excel file."""
        report = self.report_service.generated_daily_report()
        generate_excel({
            "dailyReport": [report.numberOrders,
                            report.numberOrderCompleted,
                            report.numberOrderPending],
            "productsMostFulfilled": report.productsMostFulfilled
        }, "daily_report.xls")
    def _generate_report(self):
        """ Generate and display the daily report in the UI."""
        report = self.report_service.generated_daily_report()
        self.total_orders_lbl.setText(f"Total orders: {report.numberOrders}")
        self.completed_orders_lbl.setText(f"Completed: {report.numberOrderCompleted}")
        self.pending_orders_lbl.setText(f"Pending: {report.numberOrderPending}")

        self._render_products(report.productsMostFulfilled)
    def _render_products(self, products):
        self.products_table.setSortingEnabled(False)
        self.products_table.clearContents()
        self.products_table.setRowCount(len(products))

        for r, (code, qty) in enumerate(products):
            self.products_table.setItem(r, 0, QTableWidgetItem(str(code)))
            self.products_table.setItem(r, 1, QTableWidgetItem(str(qty)))

        self.products_table.setSortingEnabled(True)
        self.products_table.resizeColumnsToContents()

    def reset_ui(self):
        self.total_orders_lbl.setText("Total orders: 0")
        self.completed_orders_lbl.setText("Completed: 0")
        self.pending_orders_lbl.setText("Pending: 0")
        self.products_table.setRowCount(0)

In [None]:
from PySide6.QtWidgets import (
     QMainWindow, QWidget, QVBoxLayout,QTabWidget,QLabel
)
class MainWindow(QMainWindow):
    """ Main application window containing tabs for products, orders, and reports. """
    def __init__(self,product_service, order_service, send_order, order_product_service, report_service, parent=None):
        super().__init__()
        self.setWindowTitle("LogiServe S.r.l. - Inventory Management")
        self.resize(900, 600)
        root = QWidget()
        self.setCentralWidget(root)
        main_layout = QVBoxLayout(root)
        self.tabs = QTabWidget()
        main_layout.addWidget(self.tabs)
        self.product_ui = ProductUI(product_service)
        self.order_ui = OrderUI(send_order, order_service, order_product_service)
        self.report_ui = ReportUI(report_service)
        products_tab = self._create_tab_widget(self.product_ui)
        orders_tab = self._create_tab_widget(self.order_ui)
        reports_tab = self._create_tab_widget(self.report_ui)
        self._add_tab([
            (orders_tab, "Orders"),
            (products_tab, "Products"),
            (reports_tab, "Reports")
        ])
        self.tabs.currentChanged.connect(self._on_tab_changed)

    def _on_tab_changed(self, index):
        """ Handle tab change events to refresh data as needed."""
        if index == 0:
            self.product_ui.reload_data()
        elif index == 1:
            self.order_ui.reload_orders()
        elif index == 2:
            self.report_ui.reset_ui()

    def _create_tab_widget(self, widget):
        """ Create a tab widget containing the given widget. """
        tab = QWidget()
        layout = QVBoxLayout(tab)
        layout.addWidget(widget)
        return tab

    def _add_tab(self, widgets):
        """ Add multiple tabs to the main tab widget."""
        for widget, title in widgets:
            self.tabs.addTab(widget, title)

In [None]:
import sys
from PySide6.QtWidgets import QApplication

def main():
    app = QApplication(sys.argv)
    sendOrderService = SendOrder(orderService, orderProductService, productService)
    w = MainWindow(product_service=productService, order_service=orderService, send_order=sendOrderService, order_product_service=orderProductService,report_service=reportService)
    w.show()
    sys.exit(app.exec())



In [None]:
main()