Skip to content

MatSouesme/Project-Automation

Repository files navigation

Project Automation — Agro-meteorological Data Pipeline

This repository demonstrates a complete agro-meteorological data pipeline, designed for real-time environmental monitoring and water optimization. It simulates sensor data (soil moisture, temperature, rainfall, humidity, wind, solar radiation, VPD, ET₀), ingests them via FastAPI, stores them in InfluxDB v2, orchestrates ETL to Snowflake with Airflow, and visualizes results with Grafana and Streamlit. All components run locally, securely, and free of charge (no external cloud dependencies).

Quick start (Windows PowerShell):

  1. Copy .env.example to .env and fill values (especially DOCKER_INFLUXDB_INIT_ADMIN_TOKEN).

  2. Start the stack:

Start the kubernetes cluster.

docker-compose up --build
  1. Wait for InfluxDB to initialise (check logs). The API is exposed on http://localhost:8000 and Grafana on http://localhost:3000.

  2. Verify in Grafana: add InfluxDB v2 datasource with URL http://influxdb:8086, Organization and Token from the .env (use admin/admin for login if Grafana prompts).

Simulator Configuration

The simulator generates realistic agro-meteorological data with the following variables:

  • soil_moisture (0-100%)
  • temperature_c (-50 to 80°C)
  • rain_mm (≥0 mm)
  • air_humidity_pct (30-100%)
  • wind_speed_ms (0-12 m/s)
  • solar_radiation_wm2 (0-900 W/m²)
  • vpd_kpa (≥0 kPa) - Vapor Pressure Deficit
  • et0_mm (≥0 mm) - Reference Evapotranspiration

Environment Variables

Configure the simulator via environment variables in docker-compose.yml:

  • API_URL - API endpoint (default: http://api:8000/ingest)
  • SIMULATOR_RATE_SECONDS - Time between sensor readings (default: 2)
  • SIMULATOR_SENSORS - Total number of sensors to simulate (default: 50)
  • SIMULATOR_PLOTS - Number of plots for synthetic generation (default: 2)
  • PLOTS_CSV_PATH - Path to plots CSV file (default: /app/data/plots.csv)
  • SENSORS_PER_PLOT - Fixed sensors per plot (overrides SIMULATOR_SENSORS)
  • SEED - Random seed for reproducibility (default: 42)

Expected Throughput

The simulator writes at a rate of: SIMULATOR_SENSORS / SIMULATOR_RATE_SECONDS writes/second

Data Files

data/plots.csv

Master reference for plot metadata. Contains 50 plots with diverse characteristics:

Schema:

plot_id,name,soil_type,slope_deg,orientation_deg,size_ha,crop,lat,lon,elevation_m,irrigation_system,field_capacity_pct,wilting_point_pct

Soil types: sandy, loam, clay, silt, sandy_loam, clay_loam
Crops: wheat, corn, sunflower, soybean, barley, grape, potato
Irrigation: none, drip, sprinkler

This file is used by:

  • Simulator - Maps sensors to plots
  • Streamlit - Displays plot details and maps

data/soil_thresholds.csv

Irrigation recommendation thresholds by soil type.

Schema:

soil_type,moisture_reco_threshold_pct,stress_vpd_kpa

Used by Airflow DAGs to generate irrigation recommendations in FCT_DAILY_AGR.

Airflow Orchestration and Data Pipelines

Apache Airflow is used in this project to automate and orchestrate all data movements between InfluxDB (real-time store) and Snowflake (analytical warehouse).

Airflow runs as a containerized service inside Kubernetes and executes a DAG (Directed Acyclic Graph) that defines the end-to-end ETL flow.

DAG Workflow

The DAG runs hourly and contains three main tasks:

  • Extract from InfluxDB
    • Queries the last hour of measurements from the sensors bucket in InfluxDB using the Flux query language.
    • Exports the data as a temporary CSV inside the Airflow container.
  • Transform data
    • Cleans and aggregates data every 10 minutes, by sensor_id and plot_id. Handles missing values and converts timestamps to UTC.
    • Produces a standardized dataset ready for loading into the warehouse.
  • Load to Snowflake
    • Creates the target table SENSOR_DATA if it does not exist.
    • Loads the aggregated dataset into Snowflake using the Snowflake Python connector.
    • Commits transactions and closes the connection securely.

Scheduling and Dependencies:

Schedule: 0 * * * * → runs every hour on the hour.

Dependencies: extract_influx → transform_data → load_to_snowflake Each task must complete successfully before the next one starts.

Configuration : All credentials (InfluxDB URL/token, Snowflake account, user, password, warehouse, schema, etc.) are injected through environment variables defined in the Airflow container. No secrets are stored in the DAG file, a .env file or Kubernetes Secret should be used instead.

Airflow ensures:

Automation: no manual ETL execution. Reliability: each step is retried and logged. Traceability: all runs are visible in the Airflow UI (http://localhost:8080). Scalability: tasks can run in parallel or be extended with new pipelines (e.g., weather forecasts or daily reports).

Open the UI: URL: http://localhost:8080 User: airflow Password: defined in your .env file or default credentials.

❄️ Snowflake — Data Warehouse

Snowflake serves as the analytical data warehouse of the pipeline. It stores all aggregated and historical sensor data loaded hourly by Airflow. Its separation of storage and compute ensures scalability and high query performance. Data is structured into dimension and fact tables. It provides a single, reliable source of truth for analytics and irrigation recommendations.

Streamlit — Business Dashboard

Streamlit is the user-facing dashboard built for agronomists and analysts. It connects directly to Snowflake to visualize sensor trends and irrigation alerts. Users can filter data by plot, sensor, and time period in real time. The dashboard displays soil moisture evolution, VPD, ET0, and irrigation recommendations. It offers an intuitive, web-based interface to make data-driven farm decisions.

Grafana — Monitoring and Observability

Grafana provides real-time monitoring for both infrastructure and business metrics. It connects to Prometheus to visualize CPU, memory, and pod health within the Kubernetes cluster. It also reads directly from InfluxDB and Snowflake to track data ingestion rates and latency. Custom dashboards show the end-to-end pipeline status and sensor data flow in real time. This unified observability layer ensures transparency, reliability, and rapid incident detection.

Architecture

Simulator → FastAPI → InfluxDB → Airflow → Snowflake → Streamlit
                         ↓
                      Grafana <- Prometheus <- Node Exporter

Notes:

  • The API expects timestamps with timezone (UTC). The simulator sends ISO8601 UTC timestamps.
  • If you run the simulator from your host and the API is in Docker, the simulator posts to http://localhost:8000/ingest which is exposed by compose.
  • The data/ directory is mounted read-only into the simulator container.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •