In [None]:
# Base model pipeline
# model = Pipeline(steps=[
#     ('preprocessor', preprocessor),
#     ('classifier', xgb.XGBClassifier(
#         objective='binary:logistic',
#         eval_metric='logloss',
#         use_label_encoder=False,
#         random_state=42
#     ))
# ])


In [None]:

# Modify XGBoost to weight the minority class
# model = xgb.XGBClassifier(
#     scale_pos_weight=len(y_train[y_train == 0]) / len(y_train[y_train == 1]),  # ~3.5x
#     objective='binary:logistic',
#     enable_categorical=True
# )

# Hyperparameter grid
# param_grid = {
#     'classifier__n_estimators': [100, 200, 300],
#     'classifier__max_depth': [3, 5, 7],
#     'classifier__learning_rate': [0.01, 0.1, 0.2],
#     'classifier__subsample': [0.8, 0.9, 1.0],
#     'classifier__colsample_bytree': [0.8, 0.9, 1.0],
#     'classifier__gamma': [0, 0.1, 0.2]
# }


In [None]:

# X_train["landslide_size"] = X_train["landslide_size"].astype("category")
# X_test["landslide_size"] = X_test["landslide_size"].astype("category")



In [None]:
# ==============================================
# Prediction Function for New Data
# ==============================================

# def predict_landslide_risk(country, admin_division, month):
#     """
#     Predict landslide risk for a given location and time
    
#     Args:
#         country (str): Country name
#         admin_division (str): Administrative division (state/region)
#         month (int): Month (1-12)
    
#     Returns:
#         dict: Prediction results with probabilities
#     """
#     # Geocoding to get coordinates
#     geolocator = Nominatim(user_agent="landslide_pred")
#     location = geolocator.geocode(f"{admin_division}, {country}")
    
#     if not location:
#         return {"error": "Location not found"}
    
#     # Prepare input features
#     input_data = pd.DataFrame({
#         'latitude': [location.latitude],
#         'longitude': [location.longitude],
#         'month': [month],
#         'season': [(month % 12 + 3) // 3],
#         'heavy_rain': [1 if month in [5,6,7,8,9] else 0],  # Simple seasonal rain assumption
#         'mining_area': [0],  # Default to 0 (can be modified)
#         'landslide_size': ['medium'],  # Default value
#         'admin_division_population': [data[data['admin_division_name'] == admin_division]['admin_division_population'].mean() or 100000]
#     })
    
#     # Make prediction
#     proba = best_model.predict_proba(input_data)[0]
#     prediction = best_model.predict(input_data)[0]
    
#     return {
#         'location': {
#             'country': country,
#             'admin_division': admin_division,
#             'latitude': location.latitude,
#             'longitude': location.longitude
#         },
#         'month': month,
#         'prediction': 'High risk' if prediction == 1 else 'Low risk',
#         'probability_high_risk': float(proba[1]),
#         'probability_low_risk': float(proba[0])
#     }




In [None]:
import requests
import pandas as pd

def get_simple_precipitation(zip_code, date):
    """
    Simplified NOAA API request matching your working example
    Returns precipitation in mm and raw API response
    """
    headers = {'token': 'OTTkgxkCdpKiEubbNzCpCcsbmisFDIFL'}  # Your working token
    
    # Format date range (start and end as same day)
    start_date = end_date = date
    
    # Make the exact request format you confirmed works
    url = f"http://www.ncdc.noaa.gov/cdo-web/api/v2/data?datasetid=GHCNDMS&locationid=ZIP:{zip_code}&startdate={start_date}&enddate={end_date}"
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        # Extract precipitation (TPCP) and convert to mm
        precip_mm = None
        for record in data.get('results', []):
            if record['datatype'] == 'TPCP':
                precip_mm = record['value'] / 10  # Convert tenths of mm to mm
                break
                
        return {
            'precip_mm': precip_mm,
            'raw_data': data  # Return full response for debugging
        }
        
    except Exception as e:
        print(f"API Error: {str(e)}")
        return None

# Landslide-specific processor
def get_landslide_risk_data(zip_code, date):
    """
    Gets precipitation data and calculates landslide risk factors
    """
    result = get_simple_precipitation(zip_code, date)
    
    if not result or result['precip_mm'] is None:
        print(f"No precipitation data for ZIP {zip_code} on {date}")
        return None
        
    # Calculate risk factors (customize thresholds as needed)
    risk_factors = {
        'extreme_rain': result['precip_mm'] > 100,  # >100mm = extreme risk
        'heavy_rain': result['precip_mm'] > 50,     # >50mm = high risk
        'moderate_rain': result['precip_mm'] > 20,  # >20mm = moderate risk
        'precip_mm': result['precip_mm'],
        'date': date,
        'zip_code': zip_code
    }
    
    return risk_factors

# Example usage
risk_data = get_landslide_risk_data("28801", "2000-01-01")
print(risk_data)

In [None]:
import requests
from datetime import datetime, timedelta

class LandslideRiskPredictor:
    def __init__(self, token):
        self.token = token
        self.headers = {'token': token}
        self.risk_thresholds = {
            'extreme': 100,  # mm/day
            'heavy': 50,
            'moderate': 20,
            'antecedent_7day': 150  # 7-day cumulative threshold
        }

    def get_precipitation(self, zip_code, date):
        """Core working API call (matches your successful example)"""
        url = f"http://www.ncdc.noaa.gov/cdo-web/api/v2/data?datasetid=GHCNDMS&locationid=ZIP:{zip_code}&startdate={date}&enddate={date}"
        try:
            response = requests.get(url, headers=self.headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"API Error: {str(e)}")
            return None

    def process_rainfall(self, data):
        """Extracts and calculates rainfall metrics"""
        precip_mm = None
        snow_mm = None
        
        for record in data.get('results', []):
            if record['datatype'] == 'TPCP':
                precip_mm = record['value'] / 10  # Convert to mm
            elif record['datatype'] == 'TSNW':
                snow_mm = record['value']
        
        return precip_mm, snow_mm

    def get_antecedent_rainfall(self, zip_code, target_date, days=7):
        """Gets cumulative rainfall for previous days"""
        target_date = datetime.strptime(target_date, "%Y-%m-%d")
        total = 0
        valid_days = 0
        
        for i in range(1, days+1):
            date = (target_date - timedelta(days=i)).strftime("%Y-%m-%d")
            data = self.get_precipitation(zip_code, date)
            if data:
                precip_mm, _ = self.process_rainfall(data)
                if precip_mm is not None:
                    total += precip_mm
                    valid_days += 1
        
        return total if valid_days > 0 else None

    def assess_landslide_risk(self, zip_code, date):
        """Comprehensive risk assessment"""
        # Get daily rainfall
        daily_data = self.get_precipitation(zip_code, date)
        if not daily_data:
            return None
            
        daily_precip, snow = self.process_rainfall(daily_data)
        if daily_precip is None:
            return None
        
        # Get antecedent rainfall
        antecedent_7day = self.get_antecedent_rainfall(zip_code, date)
        
        # Calculate risk factors
        risk_factors = {
            'immediate_risk': self._assess_immediate_risk(daily_precip),
            'antecedent_risk': self._assess_antecedent_risk(antecedent_7day) if antecedent_7day else None,
            'combined_risk': None,
            'daily_precip_mm': daily_precip,
            'antecedent_7day_mm': antecedent_7day,
            'snow_mm': snow,
            'date': date,
            'zip_code': zip_code
        }
        
        # Calculate combined risk
        if antecedent_7day:
            risk_factors['combined_risk'] = self._combine_risks(
                risk_factors['immediate_risk'],
                risk_factors['antecedent_risk']
            )
        
        return risk_factors

    def _assess_immediate_risk(self, precip):
        """Classify daily rainfall risk"""
        if precip >= self.risk_thresholds['extreme']:
            return 'extreme'
        elif precip >= self.risk_thresholds['heavy']:
            return 'high'
        elif precip >= self.risk_thresholds['moderate']:
            return 'moderate'
        return 'low'

    def _assess_antecedent_risk(self, antecedent):
        """Classify antecedent rainfall risk"""
        if antecedent >= self.risk_thresholds['antecedent_7day']:
            return 'high'
        return 'low'

    def _combine_risks(self, immediate, antecedent):
        """Combine risk factors"""
        if immediate == 'extreme' or antecedent == 'high':
            return 'very_high'
        if immediate == 'high':
            return 'high'
        if immediate == 'moderate' and antecedent == 'low':
            return 'moderate'
        return 'low'

# Example usage
predictor = LandslideRiskPredictor(token="OTTkgxkCdpKiEubbNzCpCcsbmisFDIFL")
risk_assessment = predictor.assess_landslide_risk("28801", "2000-01-01")
print(risk_assessment)