This repository contains a weather data ingestion pipeline and a REST API built using Flask and PostgreSQL. It ingests raw weather data from text files, stores it in a PostgreSQL database, and exposes a set of endpoints to retrieve and analyze the data.
- Data Ingestion: Reads weather data from text files and stores it in PostgreSQL.
- Checkpointing: Ensures each data file is ingested only once.
- REST API: Allows querying weather data and calculating yearly statistics for stations.
- Kubernetes: Ready to deploy on EKS with Kubernetes manifests.
- Automated CI/CD: Includes a Jenkins pipeline for automating the deployment and ingestion.
# Clone the repository:
git clone https://github.com/your-username/your-repo.git
cd your-repo# Install dependencies:
pip install -r requirements.txt# Set up PostgreSQL database:
Make sure PostgreSQL is running and the following tables are created:
- `weather_data`
- `weather_yearly_stats`
You can find SQL scripts to create these tables in the `migrations/` directory.# Run the Flask API:
python scripts/api.pyThis will start the Flask API on http://localhost:5000.
# Ingest all files from wx_data folder:
python scripts/weather_ingestion.pyThis will ingest all the weather data files from the specified directory.
# Configuration:
- **PostgreSQL Connection**: Modify the PostgreSQL connection string in `src/config/connectors.py` or use the `DATABASE_URL` environment variable.
- **API Port**: The API runs on port `5000` by default. You can change this in `api.py` if needed.The API exposes two endpoints for querying weather data and statistics.
Fetches weather data filtered by station_id and date. Supports pagination with limit and offset.
station_id: The ID of the weather station (optional).date: The date for which you want the weather data (optional).limit: The number of records to return (default: 10).offset: The starting point for pagination (default: 0).
GET /api/weather?station_id=station1&date=2024-09-15&limit=5[
{
"station_id": "station1",
"date": "2024-09-15",
"max_temp": 25.5,
"min_temp": 15.0,
"precipitation": 12.3
}
]Fetches weather statistics (average max/min temperatures and total precipitation) for a weather station and year. Supports pagination.
station_id: The ID of the weather station (optional).year: The year for which you want statistics (optional).limit: The number of records to return (default: 10).offset: The starting point for pagination (default: 0).
GET /api/weather/stats?station_id=station1&year=2024&limit=5[
{
"station_id": "station1",
"year": 2024,
"avg_max_temp": 25.3,
"avg_min_temp": 15.1,
"total_precipitation": 230.5
}
]The ingestion pipeline reads raw weather data files from the wx_data/ folder, processes them, and inserts them into the PostgreSQL database. The system uses checkpointing to ensure that each file is only processed once.
# Run the ingestion manually:
python scripts/weather_ingestion.py- Ingestion Logic: The ingestion logic is implemented in the
Ingestorclass located insrc/services/ingestor.py. - Checkpointing: Files that are processed are logged in the
checkpointstable, ensuring they are not processed again.
The repository includes configurations for deploying the Flask API and ingestion pipeline to AWS using EKS and Jenkins.
# Dockerfile to containerize Flask API:A Dockerfile is provided to containerize the Flask API and ingestion jobs.
The API includes Swagger documentation to provide a visual interface for testing the API.
# Swagger URL:
Visit `http://localhost:5000/swagger` to access the Swagger UI. The OpenAPI spec is located at `static/swagger.json`.