In [None]:
import geopandas as gpd
import requests as req
import tempfile as tmp
import datetime as dt
import os
import folium as fl

from zipfile import ZipFile

In [None]:
### This file is part of HMSView, a tool for visualizing and analyzing data from the HMS (Hydrological Modeling System).

In [None]:
### HMS data query

In [None]:
today = dt.date.today()

start_day = today - dt.timedelta(days=1)

rolling_days = [start_day - dt.timedelta(days = x) for x in range(7)]

# year =start_day.strftime('%Y')
# month = start_day.strftime('%m')
# day = start_day.strftime('%d')
url_list = [f"https://satepsanone.nesdis.noaa.gov/pub/FIRE/web/HMS/Smoke_Polygons/Shapefile/{x.strftime('%Y')}/{x.strftime('%m')}/hms_smoke{x.strftime('%Y%m%d')}.zip" for x  in rolling_days]

url_list
def open_zipped_data(url: str) -> gpd.GeoDataFrame:
    """

    Args:
        data (list of tuples): Each tuple contains state, county, and URL to the zipped data.
    """

    with req.get(url, timeout=500) as r:
        r.raise_for_status()
        with tmp.TemporaryFile() as tmp_file:    
            tmp_file.write(r.content)
            with tmp.TemporaryDirectory() as tmp_dir:          
                with ZipFile(tmp_file, "r") as zip_file:
                    zip_file.extractall(tmp_dir)
                for file in os.listdir(tmp_dir):
                    if file.endswith(".shp"):
                        file_location = os.path.join(tmp_dir, file)
                        downloaded_data = gpd.read_file(file_location)
    return downloaded_data
#get state data
states = open_zipped_data("https://www2.census.gov/geo/tiger/TIGER2024/STATE/tl_2024_us_state.zip").to_crs("EPSG:4326")
states.head()

states_dissolved = states.dissolve()

#handle Smoke data
dat = [open_zipped_data(x).to_crs("EPSG:4326") for x in url_list]

melded = [x.clip(states_dissolved).dissolve(by=['Density'], as_index=False) for x in dat]

In [None]:
from dataclasses import dataclass

@dataclass
class hms_data_handler:
    """Class to handle data operations for the HMS project."""
   
    start_date: dt.datetime = dt.date.today() - dt.timedelta(days=1)
    date_delta: int = 7

    def __post_init__(self):
        rolling_days = [self.start_date - dt.timedelta(days = x) for x in range(self.date_delta)]
        state_data = self.__open_zipped_data__("https://www2.census.gov/geo/tiger/TIGER2024/STATE/tl_2024_us_state.zip").to_crs("EPSG:4326").dissolve()
        self.smoke_url_list = [[x,f"https://satepsanone.nesdis.noaa.gov/pub/FIRE/web/HMS/Smoke_Polygons/Shapefile/{x.strftime('%Y')}/{x.strftime('%m')}/hms_smoke{x.strftime('%Y%m%d')}.zip"] for x  in rolling_days]
        smoke_data_raw =  [[x[0], self.__open_zipped_data__(x[1]).to_crs("EPSG:4326").clip(state_data).dissolve(by=['Density'], as_index=False)] for x in self.smoke_url_list]
        self.smoke_data = [[x[0], x[1].assign(style=x[1].apply(self.smoke_style_row, axis=1))] for x in smoke_data_raw]        
    
    @staticmethod
    def __open_zipped_data__(url) -> gpd.GeoDataFrame:
        """
        Args:
            data (list of tuples): Each tuple contains state, county, and URL to the zipped data.
        """
        with req.get(url, timeout=500) as r:
            r.raise_for_status()
            with tmp.TemporaryFile() as tmp_file:    
                tmp_file.write(r.content)
                with tmp.TemporaryDirectory() as tmp_dir:          
                    with ZipFile(tmp_file, "r") as zip_file:
                        zip_file.extractall(tmp_dir)
                    for file in os.listdir(tmp_dir):
                        if file.endswith(".shp"):
                            file_location = os.path.join(tmp_dir, file)
                            downloaded_data = gpd.read_file(file_location).to_crs("EPSG:4326")
                        
        return downloaded_data
    
    @staticmethod
    def smoke_style_row(row):
        if row['Density'] == 'Light':
            return {'fillColor': "#b5b5b5", 'weight': 1, "color": '#b5b5b5'}
        elif row['Density'] == 'Medium':
            return {'fillColor': '#6b6b6b', 'weight': 1, "color": '#6b6b6b'}
        elif row['Density'] == 'Heavy':
            return {'fillColor': '#AC0000', 'weight': 1, "color": '#AC0000'}
        else:
            return {'fillColor': "#0201015A", 'weight': 1, "color": '#0201015A'}

In [None]:
### handle the data clip by state 

In [None]:
from utilities.data_handler import HmsDataHandler

y = HmsDataHandler()

In [None]:
dat_smoke = y.get_smoke_data()
dat_fire = y.get_fire_data()

In [None]:
dat_smoke[0][1]

In [None]:
str(dat_smoke[0][0])

In [None]:
def smoke_style_row(row):
    if row['Density'] == 'Light':
        return {'fillColor': "#b5b5b5", 'weight': 1, "color": '#b5b5b5'}
    elif row['Density'] == 'Medium':
        return {'fillColor': '#6b6b6b', 'weight': 1, "color": '#6b6b6b'}
    elif row['Density'] == 'Heavy':
        return {'fillColor': '#AC0000', 'weight': 1, "color": '#AC0000'}
    else:
        return {'fillColor': "#0201015A", 'weight': 1, "color": '#0201015A'}

melded_styled = [x.assign(style=x.apply(smoke_style_row, axis=1)) for x in melded]

In [None]:
# smoke_grouped = melded.groupby(['Density', 'Start'])
smoke_grouped_density = [x.groupby('Density') for x in melded_styled]
# smoke_grouped_time = melded.groupby('Start')                                     

In [None]:
# geojs_smoke_light = fl.GeoJson(light_gdf, name = 'light')
# geojs_smoke_medium = fl.GeoJson(medium_gdf, name = 'medium')
# geojs_smoke_heavy = fl.GeoJson(heavy_gdf, name = 'heavy')


In [None]:
# from folium.plugins import GroupedLayerControl

# m = fl.Map(location=[39.8283, -98.5795], zoom_start=4)
# density_group = []

# for x in melded_styled:
    
#     grouped_smoke = x.groupby('Density')

#     for group_name, group_data in grouped_smoke:

#         density_group.append(fl.FeatureGroup(name = f"{group_name}"))
        
#         # layer_group = fl.FeatureGroup(name = f"{group_name[0]}")
#         fl.GeoJson(group_data).add_to(density_group[-1])

#         # m.add_child(f_group[-1])
#         density_group[-1].add_to(m)

#         # fl.features.GeoJsonPopup(
#         #     fields=['Density', 'Start'],
#         #     aliases=['Smoke Density', 'Capture Start Time']).add_to(f_group[-1])

#     # for group_name, group_data in smoke_grouped_time:
#     #     print(group_name)
#     #     time_group.append(fl.FeatureGroup(name = f"{group_name}"))
#     #     fl.GeoJson(group_data).add_to(time_group[-1])
#     #     time_group[-1].add_to(m)

# geojs_states = fl.GeoJson(states, 
#                           name='states', 
#                           control=False, 
#                           zoom_on_click=True,
#                           style_function=lambda x: {'fillColor': "#d0d0d06c", 'weight': .5, "color": "#333232"}
#                           ).add_to(m)

# # fl.LayerControl().add_to(m)   

# fl.plugins.Geocoder().add_to(m)
    
# fl.plugins.GroupedLayerControl(
#     groups = {'Density': density_group},
#     collapsed = False,
#     exclusive_groups=False
# ).add_to(m)

# m

In [None]:
melded[0]

In [None]:
# Tree control

# Need to fix labels for historic layers, Start/End are not accurate for the day

from folium.plugins.treelayercontrol import TreeLayerControl
from folium.plugins.geocoder import Geocoder

m = fl.Map(location=[39.8283, -98.5795], zoom_start=4)

historic_smoke_control = []

for x in melded_styled:
    
    data_append = {
        'label' : str(dt.datetime.strptime(x['Start'][0], '%Y%m%d %H%M')),
        "selectAllCheckbox": "Un/select All",
        'children': [
            {"label": "Light Smoke", "layer": fl.GeoJson(x[x['Density'] == 'Light'], show = False).add_to(m)},
            {"label": "Medium Smoke", "layer": fl.GeoJson(x[x['Density'] == 'Medium'], show = False).add_to(m)},
            {"label": "Heavy Smoke", "layer": fl.GeoJson(x[x['Density'] == 'Heavy'], show = False).add_to(m)},
            ]
        }
    
    historic_smoke_control.append(data_append)

# state_base = {
#     "label": "States",
#     "layer": fl.GeoJson(states, 
#                           name='states', 
#                           control=False, 
#                           overlay=False,
#                           zoom_on_click=True,
#                           style_function=lambda x: {'fillColor': "#d0d0d06c", 'weight': .5, "color": "#333232"}
#                           ).add_to(m)
# }

overlay_tree = {
    "label": "Smoke Density Data",
    "selectAllCheckbox":True,
    "children": [
        {
            "label": "Yesterday Smoke Data",
            "selectAllCheckbox": True,
            "children": [
                {"label": "Light Smoke", "layer": fl.GeoJson(melded_styled[0][melded_styled[0]['Density'] == 'Light'],overlay=True).add_to(m)},
                {"label": "Medium Smoke", "layer": fl.GeoJson(melded_styled[0][melded_styled[0]['Density'] == 'Medium'], overlay=True).add_to(m)},
                {"label": "Heavy Smoke", "layer": fl.GeoJson(melded_styled[0][melded_styled[0]['Density'] == 'Heavy'], overlay= True).add_to(m)},
                ]
            }, {
            "label": "Last 7 Days Smoke Data",
            "selectAllCheckbox": True,
            "collapsed": True,
            "children": historic_smoke_control,
        },
    ]
}


tree_control = TreeLayerControl(overlay_tree = overlay_tree, collapsed=False).add_to(m)

geojs_states = fl.GeoJson(states, 
                          name='states', 
                          control=True, 
                          zoom_on_click=True,
                          style_function=lambda x: {'fillColor': "#d0d0d06c", 'weight': .5, "color": "#333232"}).add_to(m)

Geocoder(position= "topleft").add_to(m)

m

In [None]:
m.save('hmsview_map.html')

In [None]:
heavy_group = fl.FeatureGroup(name= 'Heavy Smoke')
fl.GeoJson(smoke_grouped.get_group())

In [None]:
fl.features.GeoJsonPopup(
                        fields=['STUSPS', 'NAME'],
                        aliases=['State Code', 'State Name']).add_to(geojs_states)

geojs_states.add_to(m)

In [None]:
fl.features.GeoJsonPopup(
                        fields=['Density', 'Start'],
                        aliases=['Smoke Density', 'Capture Start Time']).add_to(geojs_smoke_light)
geojs_smoke_light.add_to(m)

In [None]:
fl.features.GeoJsonPopup(
                        fields=['Density', 'Start'],
                        aliases=['Smoke Density', 'Capture Start Time']).add_to(geojs_smoke_medium)
geojs_smoke_medium.add_to(m)

In [None]:
fl.features.GeoJsonPopup(
                        fields=['Density', 'Start'],
                        aliases=['Smoke Density', 'Capture Start Time']).add_to(geojs_smoke_heavy)
geojs_smoke_heavy.add_to(m)

In [None]:
fl.LayerControl().add_to(m)

In [None]:
m

### FAST API AND SQLALCHEMY

In [None]:
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/hello/{name}")
async def say_hello(name: str):
    return {"message": f"Hello, {name}!"}