In [17]:
import requests
import json
import datetime
import time
import os
import statistics
from dotenv import dotenv_values
from typing import Dict, List, Any, Optional
import pandas as pd 
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
import os

In [19]:
# PEST_DATA = pd.read_csv("Datasets/pest_weather.csv")
PEST_DATA = pd.read_csv("Datasets/pest_.csv")
PEST_DATA.head()

Unnamed: 0,plant,insect_name,damage,min_temp,max_temp,avg_temp,humidity,rain
0,Apple,Codling Moth,Larvae bore into fruit,10,25,17.5,50-70,Low
1,Apple,Apple Maggot,Tunnels in fruit,15,30,22.5,60-80,Moderate
2,Apple,Woolly Apple Aphid,Sap sucking/woolly mass,10,25,17.5,50-70,Moderate
3,Apple,San Jose Scale,Bark/fruit damage,15,30,22.5,40-60,Low
4,Apple,Tent Caterpillar,Leaf defoliation,12,28,20.0,60-80,Moderate


In [20]:
# Create a mapping dictionary for rain values to percentages
rain_mapping = {
    'Low': 20,       # Low rain -> 20% chance
    'Moderate': 50,  # Moderate rain -> 50% chance
    'High': 80       # High rain -> 80% chance
}

# Create a new column with numerical percentage values
PEST_DATA['rain_percentage'] = PEST_DATA['rain'].map(rain_mapping)

# Display the first few rows to verify the conversion
PEST_DATA[['plant', 'rain', 'rain_percentage']].head()

Unnamed: 0,plant,rain,rain_percentage
0,Apple,Low,20
1,Apple,Moderate,50
2,Apple,Moderate,50
3,Apple,Low,20
4,Apple,Moderate,50


In [21]:
class WeatherAnalyzer:
    def __init__(self, api_key: str, city: str, data_dir: str = "weather_data"):
        """
        Initialize the Weather Analyzer with API key and city
        
        Parameters:
        - api_key: OpenWeather API key
        - city: City name to analyze weather for
        - data_dir: Directory to store historical weather data
        """
        self.api_key = api_key
        self.city = city
        self.data_dir = data_dir
        self.current_data = None
        self.forecast_data = None
        self.historical_data = []
        
        # Create data directory if it doesn't exist
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
            
        # Load existing historical data if available
        self._load_historical_data()
    
    def calculate_rain_chance(self, weather_data: Dict[str, Any]) -> int:
        """
        Calculate chance of rain based on weather parameters
        
        Parameters:
        - weather_data: Weather data dictionary
        
        Returns:
        - Integer from 0-100 representing rain probability
        """
        rain_chance = 0
        
        # Check weather condition
        weather_type = weather_data.get("weather_main", "").lower()
        if "rain" in weather_type or "drizzle" in weather_type:
            rain_chance += 70
        elif "shower" in weather_type:
            rain_chance += 60
        elif "thunderstorm" in weather_type:
            rain_chance += 80
        elif "clouds" in weather_type:
            rain_chance += 30
        
        # Factor in humidity (higher humidity increases rain chance)
        humidity = weather_data.get("humidity", 0)
        rain_chance += min(humidity / 5, 20)  # Up to 20 points for humidity
        
        # Factor in cloud coverage
        clouds = weather_data.get("clouds", 0)
        rain_chance += clouds / 5  # Up to 20 points for full cloud coverage
        
        # Pressure factor (lower pressure often means higher rain chance)
        # Standard pressure is around 1013 hPa
        pressure = weather_data.get("pressure", 1013)
        if pressure < 1005:
            rain_chance += 10
        elif pressure < 1000:
            rain_chance += 15
        
        # Ensure we stay within 0-100 range
        return max(0, min(100, int(rain_chance)))
    
    def fetch_current_weather(self) -> Dict[str, Any]:
        """Fetch current weather data from OpenWeather API"""
        url = f"https://api.openweathermap.org/data/2.5/weather?q={self.city}&appid={self.api_key}&units=metric"
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            
            self.current_data = response.json()
            
            # Clean and format the data
            cleaned_data = {
                "timestamp": datetime.datetime.now().isoformat(),
                "fetch_time": int(time.time()),
                "temp": self.current_data["main"]["temp"],
                "feels_like": self.current_data["main"]["feels_like"],
                "temp_min": self.current_data["main"]["temp_min"],
                "temp_max": self.current_data["main"]["temp_max"],
                "pressure": self.current_data["main"]["pressure"],
                "humidity": self.current_data["main"]["humidity"],
                "wind_speed": self.current_data["wind"]["speed"],
                "wind_direction": self.current_data["wind"]["deg"],
                "weather_main": self.current_data["weather"][0]["main"],
                "weather_description": self.current_data["weather"][0]["description"],
                "clouds": self.current_data["clouds"]["all"],
                "city": self.city
            }
            
            # Calculate and add rain chance
            cleaned_data["rain_chance"] = self.calculate_rain_chance(cleaned_data)
            
            # Save this data to historical records
            self._save_weather_data(cleaned_data)
            
            return cleaned_data
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching current weather: {e}")
            return {}
    
    def fetch_forecast(self, days: int = 5) -> Dict[str, Any]:
        """Fetch weather forecast data from OpenWeather API"""
        url = f"https://api.openweathermap.org/data/2.5/forecast?q={self.city}&appid={self.api_key}&units=metric"
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            
            self.forecast_data = response.json()
            
            # Clean and format the forecast data
            cleaned_forecast = {
                "timestamp": datetime.datetime.now().isoformat(),
                "city": self.city,
                "forecast": []
            }
            
            for item in self.forecast_data["list"]:
                forecast_item = {
                    "dt": item["dt"],
                    "dt_txt": item["dt_txt"],
                    "temp": item["main"]["temp"],
                    "feels_like": item["main"]["feels_like"],
                    "temp_min": item["main"]["temp_min"],
                    "temp_max": item["main"]["temp_max"],
                    "pressure": item["main"]["pressure"],
                    "humidity": item["main"]["humidity"],
                    "weather_main": item["weather"][0]["main"],
                    "weather_description": item["weather"][0]["description"],
                    "clouds": item["clouds"]["all"],
                    "wind_speed": item["wind"]["speed"],
                    "wind_direction": item["wind"]["deg"]
                }
                
                # Calculate and add rain chance
                forecast_item["rain_chance"] = self.calculate_rain_chance(forecast_item)
                
                cleaned_forecast["forecast"].append(forecast_item)
            
            # Save only the first X days worth of forecast data
            forecasts_per_day = 8  # OpenWeather provides 8 forecasts per day (every 3 hours)
            limit = days * forecasts_per_day
            cleaned_forecast["forecast"] = cleaned_forecast["forecast"][:limit]
            
            return cleaned_forecast
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching forecast: {e}")
            return {}
            
    def _save_weather_data(self, data: Dict[str, Any]) -> None:
        """Save weather data to historical records"""
        # Add to in-memory historical data
        self.historical_data.append(data)
        
        # Save to file
        filename = f"{self.data_dir}/{self.city.lower()}_history.json"
        
        try:
            if os.path.exists(filename):
                with open(filename, 'r') as f:
                    existing_data = json.load(f)
            else:
                existing_data = []
                
            existing_data.append(data)
            
            with open(filename, 'w') as f:
                json.dump(existing_data, f, indent=2)
                
        except Exception as e:
            print(f"Error saving historical data: {e}")
    
    def _load_historical_data(self) -> None:
        """Load historical weather data from file"""
        filename = f"{self.data_dir}/{self.city.lower()}_history.json"
        
        if os.path.exists(filename):
            try:
                with open(filename, 'r') as f:
                    self.historical_data = json.load(f)
            except Exception as e:
                print(f"Error loading historical data: {e}")
                self.historical_data = []
        else:
            self.historical_data = []
    
    def get_historical_data(self, days: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Get historical weather data
        
        Parameters:
        - days: Number of days back to retrieve data (None for all data)
        
        Returns:
        - List of historical weather data points
        """
        if not days:
            return self.historical_data
        
        # Calculate timestamp for 'days' ago
        cutoff_time = int(time.time()) - (days * 24 * 60 * 60)
        
        return [data for data in self.historical_data if data.get("fetch_time", 0) >= cutoff_time]
    
    def generate_analysis_report(self, days_historical: int = 7, include_forecast: bool = True) -> Dict[str, Any]:
        """
        Generate comprehensive weather analysis report
        
        Parameters:
        - days_historical: Days of historical data to include in analysis
        - include_forecast: Whether to include forecast data
        
        Returns:
        - Dictionary containing analysis results
        """
        # Get current weather if we don't have it
        if not self.current_data:
            self.fetch_current_weather()
        
        # Get forecast if requested and we don't have it
        if include_forecast and not self.forecast_data:
            self.fetch_forecast()
        
        # Get historical data
        historical = self.get_historical_data(days_historical)
        
        # Initialize report dictionary
        report = {
            "city": self.city,
            "report_generated": datetime.datetime.now().isoformat(),
            "current_weather": self.fetch_current_weather() if self.current_data else {},
            "historical_analysis": {},
            "forecast_summary": {}
        }
        
        # Add historical analysis if we have data
        if historical:
            # Temperature analysis
            temps = [item["temp"] for item in historical if "temp" in item]
            if temps:
                report["historical_analysis"]["temperature"] = {
                    "average": round(statistics.mean(temps), 2),
                    "median": round(statistics.median(temps), 2),
                    "min": round(min(temps), 2),
                    "max": round(max(temps), 2),
                    "range": round(max(temps) - min(temps), 2),
                    "days_analyzed": days_historical
                }
            
            # Humidity analysis
            humidity = [item["humidity"] for item in historical if "humidity" in item]
            if humidity:
                report["historical_analysis"]["humidity"] = {
                    "average": round(statistics.mean(humidity), 2),
                    "median": round(statistics.median(humidity), 2),
                    "min": min(humidity),
                    "max": max(humidity)
                }
            
            # Wind analysis
            wind_speeds = [item["wind_speed"] for item in historical if "wind_speed" in item]
            if wind_speeds:
                report["historical_analysis"]["wind"] = {
                    "average_speed": round(statistics.mean(wind_speeds), 2),
                    "max_speed": round(max(wind_speeds), 2)
                }
            
            # Weather conditions summary
            weather_counts = {}
            for item in historical:
                if "weather_main" in item:
                    weather_type = item["weather_main"]
                    weather_counts[weather_type] = weather_counts.get(weather_type, 0) + 1
            
            if weather_counts:
                total = sum(weather_counts.values())
                weather_summary = {k: {"count": v, "percentage": round((v / total) * 100, 2)} 
                                  for k, v in weather_counts.items()}
                report["historical_analysis"]["weather_conditions"] = weather_summary
                
                # Determine predominant weather
                report["historical_analysis"]["predominant_weather"] = max(weather_counts, key=weather_counts.get)
        
        # Add forecast summary if requested
        if include_forecast and self.forecast_data:
            forecast_items = self.forecast_data["list"] if "list" in self.forecast_data else []
            
            if forecast_items:
                forecast_temps = [item["main"]["temp"] for item in forecast_items]
                forecast_weather = {}
                
                for item in forecast_items:
                    weather_type = item["weather"][0]["main"]
                    forecast_weather[weather_type] = forecast_weather.get(weather_type, 0) + 1
                
                report["forecast_summary"] = {
                    "avg_temp": round(statistics.mean(forecast_temps), 2),
                    "min_temp": round(min(forecast_temps), 2),
                    "max_temp": round(max(forecast_temps), 2),
                    "predominant_weather": max(forecast_weather, key=forecast_weather.get),
                    "forecast_hours": len(forecast_items)
                }
                
                # Add daily forecast summaries
                daily_forecasts = {}
                for item in forecast_items:
                    date = item["dt_txt"].split(" ")[0]
                    if date not in daily_forecasts:
                        daily_forecasts[date] = {
                            "temps": [],
                            "weather_types": []
                        }
                    
                    daily_forecasts[date]["temps"].append(item["main"]["temp"])
                    daily_forecasts[date]["weather_types"].append(item["weather"][0]["main"])
                
                report["forecast_summary"]["daily"] = {}
                for date, data in daily_forecasts.items():
                    # Count most common weather condition
                    weather_count = {}
                    for w in data["weather_types"]:
                        weather_count[w] = weather_count.get(w, 0) + 1
                    
                    most_common = max(weather_count, key=weather_count.get)
                    
                    report["forecast_summary"]["daily"][date] = {
                        "avg_temp": round(statistics.mean(data["temps"]), 2),
                        "min_temp": round(min(data["temps"]), 2),
                        "max_temp": round(max(data["temps"]), 2),
                        "predominant_weather": most_common
                    }
        
        # Add trend analysis if we have enough historical data and forecast
        if historical and include_forecast and self.forecast_data:
            if len(historical) >= 2:
                recent_temps = [item["temp"] for item in sorted(historical, key=lambda x: x.get("fetch_time", 0))[-3:]]
                recent_avg = statistics.mean(recent_temps)
                
                first_forecast_temp = self.forecast_data["list"][0]["main"]["temp"] if "list" in self.forecast_data and self.forecast_data["list"] else None
                
                if first_forecast_temp is not None:
                    report["trend_analysis"] = {
                        "recent_temp_trend": round(recent_temps[-1] - recent_temps[0], 2),
                        "forecast_vs_recent": round(first_forecast_temp - recent_avg, 2)
                    }
                    
                    # Provide a simple trend interpretation
                    if first_forecast_temp > recent_avg:
                        report["trend_analysis"]["temperature_outlook"] = "warming"
                    elif first_forecast_temp < recent_avg:
                        report["trend_analysis"]["temperature_outlook"] = "cooling"
                    else:
                        report["trend_analysis"]["temperature_outlook"] = "stable"
        
        return report


In [22]:
API_KEY = dotenv_values(".env").get("API_KEY")
        
city = "Coimbatore"
    
analyzer = WeatherAnalyzer(API_KEY, city)
    
# Fetch current weather
current = analyzer.fetch_current_weather()

print(current)

{'timestamp': '2025-03-24T15:59:36.354960', 'fetch_time': 1742812176, 'temp': 33.88, 'feels_like': 36.01, 'temp_min': 33.88, 'temp_max': 33.88, 'pressure': 1008, 'humidity': 43, 'wind_speed': 3.09, 'wind_direction': 90, 'weather_main': 'Clouds', 'weather_description': 'scattered clouds', 'clouds': 40, 'city': 'Coimbatore', 'rain_chance': 46}


<h1>RandomForestClassifier Model</h1>

In [None]:
# Process the humidity data (convert from ranges like "40-60" to average values)
def extract_humidity_avg(humidity_range):
    low, high = map(int, humidity_range.split('-'))
    return (low + high) / 2

# Preprocess the data
pest_data_processed = PEST_DATA.copy()
pest_data_processed['humidity_avg'] = pest_data_processed['humidity'].apply(extract_humidity_avg)

# Create features (X) and target (y)
X = pest_data_processed[['plant', 'avg_temp', 'humidity_avg', 'rain_percentage']]
y = pest_data_processed['insect_name']

# One-hot encode the plant names
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
plant_encoded = encoder.fit_transform(X[['plant']])
plant_feature_names = encoder.get_feature_names_out(['plant'])

# Combine numerical features with one-hot encoded features
X_numerical = X[['avg_temp', 'humidity_avg', 'rain_percentage']].values
X_processed = np.hstack([plant_encoded, X_numerical])

# Split data for training and testing
X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.2, random_state=42)

# Train the model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate the model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model accuracy: {accuracy*100:.2f}")

# save the model file
# Create models directory if it doesn't exist
if not os.path.exists('models'):
    os.makedirs('models')

# Save the model to a file
joblib.dump(model, 'models/pest_prediction_model.joblib')
print("Model saved to models/pest_prediction_model.joblib")

Model accuracy: 12.50
Model saved to models/pest_prediction_model.joblib


In [1]:
def load_model():
    # Load the trained pest prediction model
    try:
        # Load the model from file
        model = joblib.load('models/pest_prediction_model.joblib')
        print("Pest prediction model loaded successfully")
        
        # Display model information
        print(f"Model type: {type(model).__name__}")
        # print(f"Number of trees in forest: {model.n_estimators}")
        print(f"Number of insect classes: {len(model.classes_)}")
        print(f"Insect classes: {', '.join(model.classes_[:5])}...")
        
        # Create a function to get a list of supported plants
        supported_plants = [name.replace('plant_', '') for name in plant_feature_names]
        print(f"\nSupported plants ({len(supported_plants)}): {', '.join(supported_plants)}")
        return model
        
    except FileNotFoundError:
        print("Error: Model file not found. Please run the training cell first.")
    except Exception as e:
        print(f"Error loading model: {str(e)}")
    return None

In [25]:
# Function to predict insects for a given plant based on current weather
def predict_pest_risk(plant_name, current_weather):
    
    model = load_model()
    
    if not model:
        raise FileNotFoundError("Model not Found!")
        return
    # Extract relevant weather info
    temp = current_weather.get('temp', 0)
    humidity = current_weather.get('humidity', 0)
    rain_chance = current_weather.get('rain_chance', 0)
    
    # Prepare input for prediction
    plant_input = np.zeros((1, len(plant_feature_names)))
    try:
        plant_idx = np.where(plant_feature_names == f'plant_{plant_name}')[0][0]
        plant_input[0, plant_idx] = 1
    except:
        print(f"Warning: Plant '{plant_name}' not found in training data")
        
    weather_input = np.array([[temp, humidity, rain_chance]])
    input_data = np.hstack([plant_input, weather_input])
    
    # Get probabilities for each insect
    probas = model.predict_proba(input_data)[0]
    
    # Get top 3 insects with highest probabilities
    top_indices = probas.argsort()[-3:][::-1]
    top_insects = [(model.classes_[i], probas[i]) for i in top_indices]
    
    # Get damage descriptions for these insects
    results = []
    for insect, prob in top_insects:
        damage = pest_data_processed[pest_data_processed['insect_name'] == insect]['damage'].values[0]
        results.append({
            'insect': insect,
            'probability': prob * 100,  # Convert to percentage
            'damage': damage
        })
    
    return results

In [26]:
# Example usage with the current weather data
plant_name = "Tomato"  # Example plant
risk_results = predict_pest_risk(plant_name, current)

# Display results
print(f"\nPest risk analysis for {plant_name} based on current weather in {city}:")
print(f"Temperature: {current['temp']}°C, Humidity: {current['humidity']}%, Rain chance: {current['rain_chance']}%")
print("\nPotential pest risks:")
for idx, result in enumerate(risk_results, 1):
    print(f"{idx}. {result['insect']} (Risk: {result['probability']:.1f}%)")
    print(f"   Potential damage: {result['damage']}")

Pest prediction model loaded successfully
Model type: RandomForestClassifier
Number of insect classes: 42
Insect classes: Aphids, Apple Maggot, Bihar Hairy Caterpillar, Black Cherry Aphid, Broad Mite...

Supported plants (12): Apple, Blueberry, Cherry, Corn, Grape, Peach, Pepper, Potato, Raspberry, Soybean, Strawberry, Tomato

Pest risk analysis for Tomato based on current weather in Coimbatore:
Temperature: 33.88°C, Humidity: 43%, Rain chance: 46%

Potential pest risks:
1. Red Spider Mite (Risk: 22.0%)
   Potential damage: Leaf yellowing
2. Whitefly (Risk: 18.0%)
   Potential damage: Sap sucking/mold
3. Aphids (Risk: 13.9%)
   Potential damage: Sap sucking/stunting
