Skip to content

adding the airflow again fixes #160#366

Merged
chana-rn merged 1 commit intomainfrom
chanarn/fruit_airflow_3!!
Nov 12, 2025
Merged

adding the airflow again fixes #160#366
chana-rn merged 1 commit intomainfrom
chanarn/fruit_airflow_3!!

Conversation

@chana-rn
Copy link
Copy Markdown
Collaborator

No description provided.

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Nov 12, 2025

Greptile Overview

Greptile Summary

This PR implements a comprehensive Airflow-based fruit orchestration system that adds machine learning capabilities for fruit classification and ripeness detection to the AgCloud platform. The changes introduce multiple microservices including a fruit classifier service using PyTorch models (MobileNet v3), a ripeness detection API with conditional learning architecture, and an alerting system that monitors fruit ripeness thresholds via Kafka messaging.

The implementation follows a microservices architecture with Docker containerization, where each service (classifier, ripeness detection, alerting) has its own configuration, requirements, and docker-compose setup. The system integrates with existing AgCloud infrastructure including MinIO for image storage, PostgreSQL for data persistence, and Kafka for messaging. An Airflow DAG orchestrates the entire pipeline with daily fruit classification jobs and weekly ripeness monitoring workflows, using timezone-aware scheduling for the Israel timezone.

The fruit classification service processes images from MinIO storage and logs results to PostgreSQL, while the ripeness service uses a conditional model architecture that considers both visual features and fruit type for more accurate ripeness predictions. The alerting service monitors weekly rollup data and generates notifications when ripeness percentages exceed configured thresholds. All services include comprehensive documentation, health checks, and proper error handling for production deployment.

Important Files Changed

Filename Score Overview
services/fruit-orchestration/dags/ag_compose_scheduler.py 4/5 New Airflow DAG orchestrating daily fruit classification and weekly ripeness monitoring workflows
services/fruit-orchestration/services/fruit_ripeness_alert/app.py 2/5 New alerting service with critical hardcoded device_id bug and missing error handling
services/fruit-orchestration/services/classifier/inference/infer_minio_batch.py 3/5 New batch inference script with duplicate classification calls causing inefficient resource usage
services/fruit-orchestration/services/ripeness/api/ripeness_api.py 3/5 New FastAPI ripeness prediction service with database connection management issues
services/fruit-orchestration/services/classifier/metrics_db/db.py 3/5 New database interface with hardcoded default credentials and missing error handling
services/fruit-orchestration/services/ripeness/jobs/weekly_ripeness_job.py 3/5 New weekly batch job for ripeness prediction with path manipulation and database transaction concerns
services/fruit-orchestration/services/fruit_ripeness_alert/secret/db_api_token 2/5 API token hardcoded in version control creating significant security risk
services/fruit-orchestration/services/ripeness/README.md 3/5 Comprehensive documentation with path inconsistencies and formatting issues
services/fruit-orchestration/services/classifier/inference/service.py 3/5 New FastAPI inference service with environment variable validation issues
services/fruit-orchestration/services/ripeness/tools/data_prep/prepare_from_minio.py 3/5 Data preparation script with duplicate MinIO client initialization and ambiguous logic

Confidence score: 3/5

  • This PR requires careful review due to several critical bugs and security issues that could cause production problems
  • Score lowered due to hardcoded API tokens in version control, critical bugs like hardcoded device_id in alerting service, duplicate resource usage in batch processing, database connection management issues, and missing error handling in key components
  • Pay close attention to the fruit_ripeness_alert service (device_id bug), security token management, database connection handling across all services, and the batch inference duplicate processing logic

Sequence Diagram

sequenceDiagram
    participant User
    participant Airflow_Scheduler
    participant Fruit_Classifier
    participant MinIO
    participant PostgreSQL
    participant Ripeness_API
    participant Fruit_Alerts
    participant Kafka

    User->>Airflow_Scheduler: "Schedule daily fruit processing"
    Note over Airflow_Scheduler: "Daily at scheduled time"
    
    Airflow_Scheduler->>Fruit_Classifier: "Run classifier compose"
    Fruit_Classifier->>MinIO: "Fetch images from s3://classification/samples/"
    MinIO-->>Fruit_Classifier: "Return image data"
    Fruit_Classifier->>Fruit_Classifier: "Perform batch inference"
    Fruit_Classifier->>PostgreSQL: "Insert inference results into inference_logs"
    Fruit_Classifier-->>Airflow_Scheduler: "Classifier complete"
    
    Note over Airflow_Scheduler: "Weekly gate check (Tuesday by default)"
    Airflow_Scheduler->>Airflow_Scheduler: "Check if today is target weekday"
    
    alt Weekly processing day
        Airflow_Scheduler->>Ripeness_API: "Start ripeness service"
        Airflow_Scheduler->>Ripeness_API: "POST /predict-last-week"
        Ripeness_API->>PostgreSQL: "Query inference_logs for last 7 days"
        PostgreSQL-->>Ripeness_API: "Return unprocessed inference records"
        
        loop For each image
            Ripeness_API->>MinIO: "Fetch image from URL"
            MinIO-->>Ripeness_API: "Return image bytes"
            Ripeness_API->>Ripeness_API: "Run ripeness prediction model"
            Ripeness_API->>PostgreSQL: "Insert into ripeness_predictions"
        end
        
        Ripeness_API->>PostgreSQL: "Create weekly rollup snapshot"
        Ripeness_API-->>Airflow_Scheduler: "Prediction and rollup complete"
        Airflow_Scheduler->>Ripeness_API: "Stop ripeness service"
        
        Airflow_Scheduler->>Fruit_Alerts: "Run fruit ripeness alert batch job"
        Fruit_Alerts->>PostgreSQL: "Get ripeness threshold configuration"
        PostgreSQL-->>Fruit_Alerts: "Return threshold (default 0.8)"
        Fruit_Alerts->>PostgreSQL: "Query weekly rollups for last 7 days"
        PostgreSQL-->>Fruit_Alerts: "Return rollup data"
        
        loop For each device with high ripeness
            alt Ripeness percentage >= threshold
                Fruit_Alerts->>Kafka: "Send alert to 'alerts' topic"
            end
        end
        
        Fruit_Alerts-->>Airflow_Scheduler: "Alert processing complete"
    else Skip weekly processing
        Airflow_Scheduler->>Airflow_Scheduler: "Skip weekly tasks"
    end
Loading

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

50 files reviewed, 37 comments

Edit Code Review Agent Settings | Greptile

model, ltr, lva, device,
cfg["epochs_frozen"], cfg["lr"], cfg["weight_decay"],
cfg["checkpoint_dir"], tag="frozen", ce=ce, patience=2

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Remove trailing whitespace

Suggested change

cfg["checkpoint_dir"], tag="unfrozen", ce=ce, patience=2
)

os.makedirs(cfg["checkpoint_dir"], exist_ok=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Redundant directory creation - already done inside train_phase function

Comment on lines +12 to +13
services/ripeness-ml/
├─ api/
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Path inconsistency: documentation shows services/ripeness-ml/ but actual service is at services/fruit-orchestration/services/ripeness/

Suggested change
services/ripeness-ml/
├─ api/
services/fruit-orchestration/services/ripeness/
├─ api/


## 🐳 Build & Run (Docker)

From `services/ripeness-ml/`:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Incorrect path reference - should be services/fruit-orchestration/services/ripeness/ not services/ripeness-ml/

Suggested change
From `services/ripeness-ml/`:
From `services/fruit-orchestration/services/ripeness/`:

@@ -0,0 +1,16 @@
version: "3.8"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Unicode BOM character detected at beginning of file. This can cause parsing issues with some tools.

@@ -0,0 +1,167 @@
# file: services/weekly_ripeness_job.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: filename comment is incorrect - should be 'weekly_ripeness_job.py' not 'services/weekly_ripeness_job.py'

Suggested change
# file: services/weekly_ripeness_job.py
# file: weekly_ripeness_job.py

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def load_image_for_model(img_bytes):
im = Image.open(io.BytesIO(img_bytes)).convert("RGB")
from torchvision import transforms
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: importing transforms inside function on every call is inefficient - move to module level

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +154 to +158
t0 = perf_counter()
cls, score = classify_bytes(model, tfms, idx_to_class, data)
t_ms = (perf_counter() - t0) * 1000.0

cls, score = classify_bytes(model, tfms, idx_to_class, data)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: duplicate classification calls - the second call on line 158 overwrites the timing measurement from lines 154-156, wasting compute resources

Suggested change
t0 = perf_counter()
cls, score = classify_bytes(model, tfms, idx_to_class, data)
t_ms = (perf_counter() - t0) * 1000.0
cls, score = classify_bytes(model, tfms, idx_to_class, data)
t0 = perf_counter()
cls, score = classify_bytes(model, tfms, idx_to_class, data)
t_ms = (perf_counter() - t0) *1000.0

image_size=cfg.get("image_size", 224),
fruit_type=str(cls),
score=float(score),
latency_ms=float(t_ms), # <<< לא None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Hebrew comment should be in English or removed

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

MODEL_NAME=best_conditional \
BATCH_LIMIT=500

CMD ["uvicorn", "api.ripeness_api:app", "--host", "0.0.0.0", "--port", "8088", "--reload"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using --reload flag in production containers can cause issues and performance overhead. Consider removing it for production builds.

Suggested change
CMD ["uvicorn", "api.ripeness_api:app", "--host", "0.0.0.0", "--port", "8088", "--reload"]
CMD ["uvicorn", "api.ripeness_api:app", "--host", "0.0.0.0", "--port", "8088"]

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@@ -0,0 +1 @@
cf7bba69-678a-4708-829b-cb6e01c9b454 No newline at end of file
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't put token here

@chana-rn chana-rn merged commit 2d78217 into main Nov 12, 2025
PniniKlein pushed a commit that referenced this pull request Nov 13, 2025
KamaTechOrg pushed a commit that referenced this pull request Nov 26, 2025
KamaTechOrg pushed a commit that referenced this pull request Nov 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants