*****
## WFS Layer Extract
*****
Author: Mackenzie Rock

Date: June 3, 2025

Goal: The goal of this Jupyter notebook is to determine a foundation for extracting the relevant WFS layers identified and storing them in a suitable format for uploading to the postgreSQL database. In this section I will test:
- The code to extract
- I will visualize the data to ensure I have capture it correctly
- I will test several samples overtime
- Transformation into suitable format for load
- Examination of some of the datasets
- Framework for inserting into Google Cloud PostgreSQL database

### 1.0 Extraction Code

To be extracted from WFS:
- Fire Danger (PUBLIC:FDR_CURRENT_SHP)
- Fire Perimeter Estimate (PUBLIC:M3_POLYGONS_CURRENT)
- Fire M3 Hotspots (PUBLIC:HOTSPOTS_LAST24HRS)
- Season-to-date Hotspots (TBD)
- Active Fires (PUBLIC:ACTIVEFIRES_CURRENT)
- Forecast Weather Stations (PUBLIC:FIREWX_STNS & PUBLIC:FIREWX_STNS_CURRENT)
- Reporting Weather Stations (PUBLIC:FIREWX_SCRIBE & PUBLIC:FIREWX_SCRIBE_FCST)
- Fire History (PUBLIC:REPORTEDFIRES_2024 & PUBLIC:REPORTEDFIRES_YTD)
- Check on what PUBLIC:BASEMAP_INSIDE_BNDRY & PUBLIC:BASEMAP_LAND are

In [7]:
import geopandas as gpd
import folium
from geopandas import GeoDataFrame
from shapely.geometry import mapping
import os
import requests
from io import BytesIO
import pandas as pd

# WFS endpoint
WFS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/ows"
OUTPUT_DIR = "./wfs_layers"
MAP_OUTPUT_DIR = "wfs_maps"

# Ensure output dirs exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(MAP_OUTPUT_DIR, exist_ok=True)



def fetch_and_visualize_wfs_layer(layer_name: str, label: str = None, date: str = None, output_format: str = "GeoJSON"):
    """
    Fetches a layer from the CWFIS WFS endpoint, reprojects to EPSG:4326,
    saves to file, and visualizes using Folium.
    """
    # Build WFS URL
    params = {
        "services": "WFS",
        "version": "1.0.0",
        "request": "GetFeature",
        "typename": layer_name,
        "outputFormat": "application/json"
    }
    query_url = f"{WFS_URL}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"

    print(f"Fetching {layer_name} from WFS...")


    # Read GeoDataFrame
    response = requests.get(query_url)
    response.raise_for_status()  # Fail early if bad response
    gdf = gpd.read_file(BytesIO(response.content))

    # Reproject if needed
    if gdf.crs and gdf.crs.to_string() != "EPSG:4326":
        gdf = gdf.to_crs("EPSG:4326")


    json_safe_gdf = gdf.copy()
    for col in json_safe_gdf.columns:
        if pd.api.types.is_datetime64_any_dtype(json_safe_gdf[col]) or isinstance(json_safe_gdf[col].iloc[0], pd.Timestamp):
            print(f"Converting datetime column '{col}' to string")
            json_safe_gdf[col] = json_safe_gdf[col].astype(str)
        elif isinstance(json_safe_gdf[col].iloc[0], (list, dict)):
            json_safe_gdf[col] = json_safe_gdf[col].astype(str)


    # Save GeoJSON
    label_safe = label.replace(" ", "_").lower() if label else layer_name.replace(":", "_").lower()
    filename = f"{label_safe}_{date or 'latest'}.{output_format.lower()}"
    filepath = os.path.join(OUTPUT_DIR, filename)
    json_safe_gdf.to_file(filepath, driver="GeoJSON")

    # Create map centered on centroid of geometry
    centroid = json_safe_gdf.geometry.unary_union.centroid
    fmap = folium.Map(location=[centroid.y, centroid.x], zoom_start=5)

    # Add layer as external file read
    with open(filepath, "r", encoding="utf-8") as f:
        folium.GeoJson(data=f.read(), name=label or layer_name).add_to(fmap)

    # Save map
    html_map_path = os.path.join(MAP_OUTPUT_DIR, f"{label_safe}_{date or 'latest'}.html")
    fmap.save(html_map_path)


In [8]:
#from wfs_utils import fetch_and_visualize_wfs_layer

map_output_dir = "wfs_maps"
layer_names = {
    "Fire_Danger": "public:fdr_current_shp",
    "Fire_Perimeter_Estimate": "public:m3_polygons_current",
    "M3_Hotspots": "public:hotspots_last24hrs",
    "Active_Fires": "public:activefires_current",
    "Forecast_Weather_Stations": "public:firewx_stns",
    "Forecast_Weather_Stations_Current": "public:firewx_stns_current",
    "Reporting_Weather_Stations": "public:firewx_scribe",
    "Reporting_Weather_Stations_Forecast": "public:firewx_scribe_fcst",
    "Fire_History_YTD": "public:reportedfires_ytd",
    "Fire_History_2024": "public:reportedfires_2024"
}
dates = ["2024-01-01"]


for date in dates:
    for label, type in layer_names.items():

        print(f"Attempting to process layer: {label}")
        fetch_and_visualize_wfs_layer(type, label, date)





Attempting to process layer: Fire_Danger
Fetching public:fdr_current_shp from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Attempting to process layer: Fire_Perimeter_Estimate
Fetching public:m3_polygons_current from WFS...
Converting datetime column 'firstdate' to string
Converting datetime column 'lastdate' to string


  centroid = json_safe_gdf.geometry.unary_union.centroid


Attempting to process layer: M3_Hotspots
Fetching public:hotspots_last24hrs from WFS...
Converting datetime column 'rep_date' to string


  centroid = json_safe_gdf.geometry.unary_union.centroid


Attempting to process layer: Active_Fires
Fetching public:activefires_current from WFS...
Converting datetime column 'startdate' to string
Attempting to process layer: Forecast_Weather_Stations
Fetching public:firewx_stns from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Converting datetime column 'rep_date' to string


  centroid = json_safe_gdf.geometry.unary_union.centroid


Attempting to process layer: Forecast_Weather_Stations_Current
Fetching public:firewx_stns_current from WFS...
Converting datetime column 'rep_date' to string
Attempting to process layer: Reporting_Weather_Stations
Fetching public:firewx_scribe from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Converting datetime column 'rep_date' to string
Attempting to process layer: Reporting_Weather_Stations_Forecast
Fetching public:firewx_scribe_fcst from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Converting datetime column 'rep_date' to string
Attempting to process layer: Fire_History_YTD
Fetching public:reportedfires_ytd from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Converting datetime column 'startdate' to string
Attempting to process layer: Fire_History_2024
Fetching public:reportedfires_2024 from WFS...


  centroid = json_safe_gdf.geometry.unary_union.centroid


Converting datetime column 'startdate' to string


  centroid = json_safe_gdf.geometry.unary_union.centroid


### 1.1 Format for Stoage
- I will store the data in GPKG for storage reasons. This data will be extracted and transformed to GeoJSON for visualization

### 2.0 Assessing the reporting date within forecast weather stations

I'm intentionally assessing the reporting data as I want to understand whether this is fully historical or snapshot data like some of the other WFS layers that I am pulling. It shows that it is historical data and the current is snapshot.

Thus I will be inserting the historical and using current (the snapshot) to provide updates to my database daily.

In [3]:
import geopandas as gpd
import folium
from shapely.geometry import mapping
import os
import requests
from io import BytesIO
import pandas as pd

# WFS endpoint
WFS_URL = "https://cwfis.cfs.nrcan.gc.ca/geoserver/public/ows"
OUTPUT_DIR = "./wfs_layers"
MAP_OUTPUT_DIR = "wfs_maps"

# Ensure output dirs exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(MAP_OUTPUT_DIR, exist_ok=True)


def fetch_and_visualize_wfs_layer(layer_name: str, label: str = None, date: str = None, output_format: str = ""):
    """
    Fetches a layer from the CWFIS WFS endpoint, reprojects to EPSG:4326,
    saves to file, and visualizes using Folium.
    """
    # Build WFS URL
    params = {
        "services": "WFS",
        "version": "1.0.0",
        "request": "GetFeature",
        "typename": layer_name,
        "outputFormat": "application/json"
    }
    query_url = f"{WFS_URL}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"

    print(f"Fetching {layer_name} from WFS...")

    # Read GeoDataFrame
    response = requests.get(query_url)
    response.raise_for_status()  # Fail early if bad response
    gdf = gpd.read_file(BytesIO(response.content))

    # Reproject if needed
    if gdf.crs and gdf.crs.to_string() != "EPSG:4326":
        gdf = gdf.to_crs("EPSG:4326")

    json_safe_gdf = gdf.copy()
    for col in json_safe_gdf.columns:
        if pd.api.types.is_datetime64_any_dtype(json_safe_gdf[col]) or isinstance(json_safe_gdf[col].iloc[0],
                                                                                  pd.Timestamp):
            print(f"Converting datetime column '{col}' to string")
            json_safe_gdf[col] = json_safe_gdf[col].astype(str)
        elif isinstance(json_safe_gdf[col].iloc[0], (list, dict)):
            json_safe_gdf[col] = json_safe_gdf[col].astype(str)

    # Save gpkg
    label_safe = label.replace(" ", "_").lower() if label else layer_name.replace(":", "_").lower()
    filename = f"{label_safe}_{date or 'latest'}.{output_format.lower()}"
    filepath = os.path.join(OUTPUT_DIR, filename)
    gdf.to_file(f"{filepath}gpkg", layer="fire_perimeter", driver="GPKG")


#from wfs_utils import fetch_and_visualize_wfs_layer

map_output_dir = "wfs_maps"
layer_names = {
    "Fire_Danger": "public:fdr_current_shp",
    "Fire_Perimeter_Estimate": "public:m3_polygons_current",
    "M3_Hotspots": "public:hotspots_last24hrs",
    "Active_Fires": "public:activefires_current",
    "Forecast_Weather_Stations": "public:firewx_stns",
    "Forecast_Weather_Stations_Current": "public:firewx_stns_current",
    "Reporting_Weather_Stations": "public:firewx_scribe",
    "Reporting_Weather_Stations_Forecast": "public:firewx_scribe_fcst",
    "Fire_History_YTD": "public:reportedfires_ytd",
    "Fire_History_2024": "public:reportedfires_2024"
}
dates = ["2024-01-01"]

for date in dates:
    for label, type in layer_names.items():
        print(f"Attempting to process layer: {label}")
        fetch_and_visualize_wfs_layer(type, label, date)




Attempting to process layer: Fire_Danger
Fetching public:fdr_current_shp from WFS...
Attempting to process layer: Fire_Perimeter_Estimate
Fetching public:m3_polygons_current from WFS...
Converting datetime column 'firstdate' to string
Converting datetime column 'lastdate' to string
Attempting to process layer: M3_Hotspots
Fetching public:hotspots_last24hrs from WFS...
Converting datetime column 'rep_date' to string
Attempting to process layer: Active_Fires
Fetching public:activefires_current from WFS...
Converting datetime column 'startdate' to string
Attempting to process layer: Forecast_Weather_Stations
Fetching public:firewx_stns from WFS...
Converting datetime column 'rep_date' to string
Attempting to process layer: Forecast_Weather_Stations_Current
Fetching public:firewx_stns_current from WFS...
Converting datetime column 'rep_date' to string
Attempting to process layer: Reporting_Weather_Stations
Fetching public:firewx_scribe from WFS...
Converting datetime column 'rep_date' to s

Quick check on the date ranges in forecast weather stations to see the data structure.

In [None]:
import json
from collections import defaultdict
from datetime import datetime

# Load your GeoJSON data
with open("wfs_layers/forecast_weather_stations_2025-06-01.geojson", "r") as f:
    data = json.load(f)

# Store min/max rep_date per station id
station_dates = defaultdict(lambda: {"min": None, "max": None})

for feature in data["features"]:
    station_id = feature["properties"]["id"]
    rep_date_str = feature["properties"]["rep_date"]
    rep_date = datetime.fromisoformat(rep_date_str.replace("Z", "+00:00"))

    if (station_dates[station_id]["min"] is None) or (rep_date < station_dates[station_id]["min"]):
        station_dates[station_id]["min"] = rep_date
    if (station_dates[station_id]["max"] is None) or (rep_date > station_dates[station_id]["max"]):
        station_dates[station_id]["max"] = rep_date

# Print summary
for station_id, dates in station_dates.items():
    print(f"{station_id}: from {dates['min']} to {dates['max']}")

In [7]:
import json
from collections import defaultdict
from datetime import datetime


# Load your GeoJSON data
with open("wfs_layers/forecast_weather_stations_current_2025-06-01.geojson", "r") as f:
    data = json.load(f)

# Store min/max rep_date per station id
station_dates = defaultdict(lambda: {"min": None, "max": None})

for feature in data["features"]:
    station_id = feature["properties"]["id"]
    rep_date_str = feature["properties"]["rep_date"]
    rep_date = datetime.fromisoformat(rep_date_str.replace("Z", "+00:00"))

    if (station_dates[station_id]["min"] is None) or (rep_date < station_dates[station_id]["min"]):
        station_dates[station_id]["min"] = rep_date
    if (station_dates[station_id]["max"] is None) or (rep_date > station_dates[station_id]["max"]):
        station_dates[station_id]["max"] = rep_date

# Print summary
current_count = 0

for station_id, dates in station_dates.items():
    if current_count > 5:
        break
    print(f"{station_id}: from {dates['min']} to {dates['max']}")
    current_count += 1

firewx_stns_current.fid--4699bdee_1973debb200_-48ae: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00
firewx_stns_current.fid--4699bdee_1973debb200_-48ad: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00
firewx_stns_current.fid--4699bdee_1973debb200_-48ac: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00
firewx_stns_current.fid--4699bdee_1973debb200_-48ab: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00
firewx_stns_current.fid--4699bdee_1973debb200_-48aa: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00
firewx_stns_current.fid--4699bdee_1973debb200_-48a9: from 2025-06-04 12:00:00+00:00 to 2025-06-04 12:00:00+00:00


### 3.0 Framework for Inserting into postgreSQL

This module handles the ingestion of wildfire-related geospatial datasets by loading .gpkg files, applying dataset-specific transformations, validating the data against strict schemas, and uploading the results to a PostGIS database. Each dataset type (e.g., active fires, fire danger, weather stations) has its own transformation function to standardize formats, cast numeric types, and handle missing values. Validation is performed using pandera with typed schemas that ensure consistency and enforce that each input is a valid GeoDataFrame. A central registry maps table names to their corresponding validators, allowing the ETL process to remain generic and extensible. This structure keeps transformation, validation, and persistence concerns separate, and allows the pipeline to be easily extended with new datasets by adding entries to the transformation and validator mappings.

In [14]:
from pandera import DataFrameModel, Field, check_types
from pandera.typing import DataFrame, Series
from shapely.geometry import base
from datetime import datetime
import warnings
warnings.filterwarnings('ignore', module='pandera')

### Fire Danger Schema and Validator ###
class FireDangerSchema(DataFrameModel):  # ✅ correct
    id: Series[str]
    GRIDCODE: Series[int] = Field(nullable=True)
    acquisition_date: Series[datetime]

class FireDangerValidator:
    @staticmethod
    @check_types
    def validate (df: DataFrame[FireDangerSchema]) -> DataFrame[FireDangerSchema]:
        #geometry checks
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df

### Active Fires Schema and Validator ###
class ActiveFiresSchema(DataFrameModel):  # ✅ correct
    id: Series[str]
    firename: Series[str]
    acquisition_date: Series[datetime]
    startdate: Series[datetime]
    hectares: Series[float]
    lat: Series[float]
    lon: Series[float]
    agency: Series[str]
    stage_of_control: Series[str]
    response_type: Series[str]


class ActiveFiresValidator:
    @staticmethod
    @check_types
    def validate (df: DataFrame[ActiveFiresSchema]) -> DataFrame[ActiveFiresSchema]:
        #geometry checks
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df

### Fire History Schema and Validator ###
class FireHistorySchema(DataFrameModel):  # ✅ correct
    id: Series[str]
    firename: Series[str]
    startdate: Series[datetime]
    hectares: Series[float]
    lat: Series[float]
    lon: Series[float]
    agency: Series[str]
    stage_of_control: Series[str]
    response_type: Series[str]
    cause: Series[str]

class FireHistoryValidator:
    @staticmethod
    @check_types
    def validate (df: DataFrame[FireHistorySchema]) -> DataFrame[FireHistorySchema]:
        #geometry checks
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df

### Fire Perimeter Schema and Validator ###
class FirePerimeterSchema(DataFrameModel):  # ✅ correct
    id: Series[str]
    acquisition_date: Series[datetime]
    firstdate: Series[datetime]
    lastdate: Series[datetime]
    hcount: Series[int]
    area: Series[float] = Field(nullable=True)

class FirePerimeterValidator:
    @staticmethod
    @check_types
    def validate (df: DataFrame[FirePerimeterSchema]) -> DataFrame[FirePerimeterSchema]:
        #geometry checks
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df

### ForecastWeatherStations ###
class ForecastWeatherStationsSchema(DataFrameModel):
    id: Series[str]
    rep_date: Series[datetime]
    wmo: Series[int]
    name: Series[str]
    agency: Series[str]
    ua: Series[str]
    instr: Series[str]
    prov: Series[str]
    lat: Series[float]
    lon: Series[float]
    elev: Series[float]
    temp: Series[float]
    td: Series[float]
    rh: Series[float]
    ws: Series[float]
    wg: Series[float] = Field(nullable=True)
    wdir: Series[int]
    pres: Series[float]
    vis: Series[float]
    rndays: Series[int]
    precip: Series[float]
    sog: Series[float]
    ffmc: Series[float] = Field(nullable=True)
    dmc: Series[float] = Field(nullable=True)
    dc: Series[float] = Field(nullable=True)
    bui: Series[float] = Field(nullable=True)
    isi: Series[float] = Field(nullable=True)
    fwi: Series[float] = Field(nullable=True)
    dsr: Series[float] = Field(nullable=True)

class ForecastWeatherStationsValidator:
    @staticmethod
    @check_types
    def validate(df: DataFrame[ForecastWeatherStationsSchema]) -> DataFrame[ForecastWeatherStationsSchema]:
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df


### ReportingWeatherStations ###
class ReportingWeatherStationsSchema(DataFrameModel):
    id: Series[str]
    rep_date: Series[datetime]
    wmo: Series[int]
    name: Series[str]
    latitude: Series[float]
    longitude: Series[float]
    elevation: Series[float]
    temp: Series[float]
    rh: Series[float]
    ws: Series[float]
    wdir: Series[float]
    precip: Series[float]
    sog: Series[float]
    ffmc: Series[float]
    dmc: Series[float]
    dc: Series[float]
    isi: Series[float]
    bui: Series[float]
    fwi: Series[float]
    dsr: Series[float]
    wx: Series[float]
    wy: Series[float]
    timezone: Series[int]
    x: Series[float]
    y: Series[float]

class ReportingWeatherStationsValidator:
    @staticmethod
    @check_types
    def validate(df: DataFrame[ReportingWeatherStationsSchema]) -> DataFrame[ReportingWeatherStationsSchema]:
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df


### ForecastWeatherStationsForecast ###
class WeatherStationsForecastSchema(DataFrameModel):
    id: Series[str]
    rep_date: Series[datetime]
    wmo: Series[int]
    name: Series[str]
    latitude: Series[float]
    longitude: Series[float]
    elevation: Series[float]
    temp: Series[float]
    rh: Series[float]
    ws: Series[float]
    wdir: Series[float]
    precip: Series[float]
    sog: Series[float]
    ffmc: Series[float]
    dmc: Series[float]
    dc: Series[float]
    isi: Series[float]
    bui: Series[float]
    fwi: Series[float]
    dsr: Series[float]
    wx: Series[float]
    wy: Series[float]
    timezone: Series[int]
    x: Series[float]
    y: Series[float]

class WeatherStationsForecastValidator:
    @staticmethod
    @check_types
    def validate(df: DataFrame[WeatherStationsForecastSchema]) -> DataFrame[WeatherStationsForecastSchema]:
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df


### M3Hotspots ###
class M3HotspotsSchema(DataFrameModel):
    id: Series[str]
    rep_date: Series[datetime]
    lat: Series[float]
    lon: Series[float]
    source: Series[str]
    sensor: Series[str]
    satellite: Series[str] = Field(nullable=True)
    agency: Series[str]
    temp: Series[float]
    rh: Series[float]
    ws: Series[float]
    wd: Series[int]
    pcp: Series[float]
    elev: Series[float] = Field(nullable=True)
    ffmc: Series[float]
    dmc: Series[float]
    dc: Series[float]
    isi: Series[float]
    bui: Series[float]
    fwi: Series[float]
    fuel: Series[str] = Field(nullable=True)
    ros: Series[float]
    sfc: Series[float]
    tfc: Series[float]
    tfc0: Series[float]
    sfc0: Series[float]
    bfc: Series[float]
    hfi: Series[float]
    cfb: Series[float]
    cbh: Series[float] = Field(nullable=True)
    cfl: Series[float] = Field(nullable=True)
    pcuring: Series[float]
    pconif: Series[float]
    cfactor: Series[float] = Field(nullable=True)
    greenup: Series[int]
    ecozone: Series[str] = Field(nullable=True)
    ecozona2: Series[str] = Field(nullable=True)
    estarea: Series[float] = Field(nullable=True)
    estarea2: Series[float] = Field(nullable=True)
    estarea3: Series[float]
    polyid: Series[str] = Field(nullable=True)
    age: Series[int]
    sfl: Series[float] = Field(nullable=True)
    frp: Series[float]
    times_burned: Series[int] = Field(nullable=True)

class M3HotspotsValidator:
    @staticmethod
    @check_types
    def validate(df: DataFrame[M3HotspotsSchema]) -> DataFrame[M3HotspotsSchema]:
        assert isinstance(df, gpd.GeoDataFrame), "Not a GeoDataFrame"
        return df


# Schema registry
VALIDATOR_REGISTRY = {
    "fire_danger": FireDangerValidator,
    'active_fires': ActiveFiresValidator,
    'fire_history': FireHistoryValidator,
    'fire_perimeter_estimates': FirePerimeterValidator,
    'forecast_weather_stations': ForecastWeatherStationsValidator,
    'm3_hotspots': M3HotspotsValidator,
    'reporting_weather_stations': ReportingWeatherStationsValidator,
    'reporting_weather_stations_forecast': WeatherStationsForecastValidator
}



In [21]:
from sqlalchemy import create_engine
from geoalchemy2 import Geometry
import geopandas as gpd
from datetime import datetime
import pandas as pd

#validate table using registry
def validate_table(df, table_name):
    validator = VALIDATOR_REGISTRY.get(table_name)
    if validator is None:
        raise ValueError(f"No validator found for table {table_name}")
    return validator.validate(df)

#transform fire danger layer
def fire_danger_transform(df):
    df['GRIDCODE'] = df['GRIDCODE'].astype('int64')
    return df

#transform active fire layer
def active_fire_transform(df):
    df['startdate'] = pd.to_datetime(df['startdate']).dt.tz_localize(None)
    return df

#transform fire history layer
def fire_history_transform(df):
    df['startdate'] = pd.to_datetime(df['startdate']).dt.tz_localize(None)
    return df

#transform fire perimeter layer
def fire_perimeter_transform(df):
    df['firstdate'] = pd.to_datetime(df['firstdate']).dt.tz_localize(None)
    df['lastdate'] = pd.to_datetime(df['lastdate']).dt.tz_localize(None)
    df['hcount'] = df['hcount'].astype('int64')
    return df

#transform forecast weather layer
def forecast_weather_transform(df):
    df['rep_date'] = pd.to_datetime(df['rep_date']).dt.tz_localize(None)
    df['wmo'] = df['wmo'].astype('int64')
    df['wdir'] = df['wdir'].astype('int64')
    df['rndays'] = df['rndays'].astype('int64')
    df['elev'] = df['elev'].astype(float)
    return df

#transform M3 hotspot layer
def M3_transform(df):
    df['rep_date'] = pd.to_datetime(df['rep_date']).dt.tz_localize(None)
    df['wd'] = df['wd'].astype('int64')
    df['greenup'] = df['greenup'].astype('int64')
    df['times_burned'] = df['times_burned'].fillna(0).astype('int64')
    df['age'] = df['age'].astype('int64')
    float_columns = [
        "lat", "lon", "temp", "rh", "ws", "pcp", "elev", "ffmc", "dmc", "dc", "isi", "bui", "fwi",
        "ros", "sfc", "tfc", "tfc0", "sfc0", "bfc", "hfi", "cfb", "cbh", "cfl", "pcuring", "pconif",
        "cfactor", "estarea", "estarea2", "estarea3", "sfl", "frp"
    ]
    for col in float_columns:
        df[col] = df[col].astype(float)
    return df

#transform reporting weather layer
def reporting_weather_transform(df):
    df['rep_date'] = pd.to_datetime(df['rep_date']).dt.tz_localize(None)
    df['wmo'] = df['wmo'].astype('int64')
    df['timezone'] = df['timezone'].astype('int64')
    float_columns = [
    "latitude", "longitude", "elevation", "temp", "rh", "ws", "wdir", "precip",
    "sog", "ffmc", "dmc", "dc", "isi", "bui", "fwi", "dsr", "wx", "wy", "x", "y"
    ]
    for col in float_columns:
        df[col] = df[col].astype(float)
    return df

#transform reporting weather forecast layer
def reporting_weather_forecast_transform(df):
    df['rep_date'] = pd.to_datetime(df['rep_date']).dt.tz_localize(None)
    df['wmo'] = df['wmo'].astype('int64')
    df['timezone'] = df['timezone'].astype('int64')
    float_columns = [
    "latitude", "longitude", "elevation", "temp", "rh", "ws", "wdir", "precip",
    "sog", "ffmc", "dmc", "dc", "isi", "bui", "fwi", "dsr", "wx", "wy", "x", "y"
    ]
    for col in float_columns:
        df[col] = df[col].astype(float)
    return df

#load gpkg, apply transform, validate and upload to postgis
def load_gpkg_to_postgis(gpkg_path: str, table_name: str, db_url='postgresql://postgres:K2><X*T$Jad#gQg2@34.23.205.32:5432/postgres'):

    acquisition_date = None
    if table_name in ['active_fires', 'fire_danger', 'fire_perimeter_estimates']:
        acquisition_date = pd.Timestamp(datetime.today().date())


    # Load the GeoPackage
    gdf = gpd.read_file(gpkg_path)

    #define the transformation dictionary
    transform_dict = {
        'fire_danger': fire_danger_transform,
        'active_fires': active_fire_transform,
        'fire_history': fire_history_transform,
        'fire_perimeter_estimates': fire_perimeter_transform,
        'forecast_weather_stations': forecast_weather_transform,
        'm3_hotspots': M3_transform,
        'reporting_weather_stations': reporting_weather_transform,
        'reporting_weather_stations_forecast': reporting_weather_forecast_transform
    }

    #attempt transformation otherwise continue
    try:
        gdf = transform_dict[table_name](gdf)
    except:
        print(f'No transformations for {table_name}')
        pass

    # Add acquisition_date if applicable
    if acquisition_date:
        print(acquisition_date)
        gdf["acquisition_date"] = acquisition_date

    # Ensure CRS is EPSG:4326
    if gdf.crs != "EPSG:4326":
        gdf = gdf.to_crs("EPSG:4326")

    validated_gdf =  validate_table(gdf, table_name)

    if validated_gdf.empty:
        raise ValueError("GeoDataFrame is empty after validation.")


    # Connect to DB
    engine = create_engine(db_url)
    validated_gdf.to_postgis(
        name=table_name,
        con=engine,
        if_exists='replace',
        index=False,
        dtype={'geometry': Geometry('GEOMETRY', srid=4326)}  # Specify the geometry type and SRID
    )

#set file path dictionary to retain data
file_path_dict = {
    'fire_danger': 'wfs_layers/fire_danger_2024-01-01.gpkg',
    'active_fires': 'wfs_layers/active_fires_2024-01-01.gpkg',
    'fire_history': 'wfs_layers/fire_history_2024_2024-01-01.gpkg',
    'fire_history': 'wfs_layers/fire_history_ytd_2024-01-01.gpkg',
    'fire_perimeter_estimates': 'wfs_layers/fire_perimeter_estimate_2024-01-01.gpkg',
    'forecast_weather_stations': 'wfs_layers/forecast_weather_stations_2024-01-01.gpkg',
    'm3_hotspots': 'wfs_layers/m3_hotspots_2024-01-01.gpkg',
    'reporting_weather_stations': 'wfs_layers/reporting_weather_stations_2024-01-01.gpkg',
    'reporting_weather_stations_forecast': 'wfs_layers/reporting_weather_stations_forecast_2024-01-01.gpkg'
}

for table, file_path in file_path_dict.items():
    print(f'Processing table: {table} from file path: {file_path}')
    load_gpkg_to_postgis(file_path, table)




Processing table: fire_danger from file path: wfs_layers/fire_danger_2024-01-01.gpkg
2025-06-07 00:00:00
Processing table: active_fires from file path: wfs_layers/active_fires_2024-01-01.gpkg
2025-06-07 00:00:00
Processing table: fire_history from file path: wfs_layers/fire_history_ytd_2024-01-01.gpkg
Processing table: fire_perimeter_estimates from file path: wfs_layers/fire_perimeter_estimate_2024-01-01.gpkg
2025-06-07 00:00:00
Processing table: forecast_weather_stations from file path: wfs_layers/forecast_weather_stations_2024-01-01.gpkg
Processing table: m3_hotspots from file path: wfs_layers/m3_hotspots_2024-01-01.gpkg
Processing table: reporting_weather_stations from file path: wfs_layers/reporting_weather_stations_2024-01-01.gpkg
Processing table: reporting_weather_stations_forecast from file path: wfs_layers/reporting_weather_stations_forecast_2024-01-01.gpkg
