In [1]:
%%writefile ../../src/utils/dataset.py
from enum import Enum

# define an dataset enum
class Dataset(Enum):
    FIRE = {
        'id': 'fire',
        'index_col': "division_id",
        'is_geo': False
    }
    SUBDIVISION = {
        'id': 'subdivision',
        'index_col': "cid",
        'is_geo': True
    }
    WEATHER = {
        'id': 'weather',
        'index_col': "",
        'is_geo': False
    }
    LIGHTNING = {
        'id': 'lightning',
        'index_col': "division_id",
        'is_geo': False
    }

Overwriting ../../src/utils/dataset.py


In [2]:
%%writefile ../../src/utils/generate_subdivision.py
from sqlalchemy.engine import URL
from sqlalchemy import create_engine
from pandas import read_sql, DataFrame
from geopandas import read_postgis, GeoDataFrame
from utils.dataset import Dataset


class GenSubdivision():
    def __init__(
            self,
            d_full:Dataset,
            s:Dataset = Dataset.SUBDIVISION,
            db_url:URL = None
    ) -> None:
        self.engine = create_engine(db_url)
        self.d_full = d_full

    def __get_subdivion_data_query(self) -> str:
        query =  """SELECT * FROM "S";"""
        return query
    
    def __get_lightning_data_query(self) -> str:
        query =  """SELECT * FROM "L_s";"""
        return query 
    
    def __get_fire_data_query(self) -> str:
        query = """ 
            SELECT
                fs.division_id,
                fs.start_date,
                fs.area_burnt_ha
            FROM 
                "F_s" fs
            WHERE
                fs.cause = 'L'
        """
        return query
    
    def __get_weather_data_query(self) -> str:
        query = ""
        return query
    
    def __read_geodata(
            self, 
            query:str, 
            index_col:str = None, 
            geom_col:str = 'geometry',
            crs:str = "EPSG:4326"
    ) -> GeoDataFrame:
        data = read_postgis(
            sql = query,
            con = self.engine,
            geom_col = geom_col,
            index_col = index_col,
            crs = crs
        )
        return data
    
    def __read_data(
            self,
            query:str,
            index_col:str = None,
    ) -> DataFrame:
        data = read_sql(
            sql = query,
            con = self.engine,
            index_col = index_col
        )
        return data

    def get_subdivision_dataset(self) -> GeoDataFrame:
        subdivision_data_query = self.__get_subdivion_data_query()
        subdivion_data = self.__read_geodata(
            subdivision_data_query,
            index_col = Dataset.SUBDIVISION.value['index_col']
        )
        return subdivion_data
    
    def __get_data_query(self) -> str:
        """ Generates the query to get the appropriate data

        Raises:
            ValueError: The dataset type is invalid.

        Returns:
            str: dataset query
        """
        if self.d_full == Dataset.LIGHTNING:
            return self.__get_lightning_data_query()
        elif self.d_full == Dataset.WEATHER:
            return self.__get_weather_data_query()
        elif self.d_full == Dataset.FIRE:
            return self.__get_fire_data_query()
        else:
            raise ValueError("Invalid return dataset type!!!")
    
    def gen_subdivisions(self):
        data_query = self.__get_data_query()
        if self.d_full.value['is_geo']:
            data = self.__read_geodata(
                data_query,
                index_col = self.d_full.value['index_col']
            )
        else:
            data = self.__read_data(
                data_query,
                index_col = self.d_full.value['index_col']
            )
        d_map = data.groupby(
            by = self.d_full.value['index_col']
        )

        return d_map

Overwriting ../../src/utils/generate_subdivision.py


In [3]:
import os
from dotenv import load_dotenv

from sqlalchemy.engine import URL

import sys
src_path = "../../src/"
sys.path.append(src_path)
from utils.generate_subdivision import GenSubdivision
from utils.dataset import Dataset

In [4]:
PATH_TO_DOT_ENV = "../../.env"

DATABASE_TYPE = "postgresql"
DATABASE_HOST = "localhost"


In [5]:
load_dotenv(PATH_TO_DOT_ENV)

DATABASE_NAME = os.environ.get("DATABASE_NAME")
POSTGRES_USER = os.environ.get("POSTGRES_USER")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD")
POSTGRES_HOST_PORT = os.environ.get("POSTGRES_HOST_PORT")
POSTGRES_CONTAINER_PORT = os.environ.get("POSTGRES_CONTAINER_PORT")

In [6]:
DATABASE_URL = URL.create(
    DATABASE_TYPE,
    username=POSTGRES_USER,
    password=POSTGRES_PASSWORD,  # plain (unescaped) text
    host=DATABASE_HOST,
    port=POSTGRES_HOST_PORT,
    database=DATABASE_NAME,
)

In [7]:
generator = GenSubdivision(
    d_full = Dataset.FIRE,
    db_url = DATABASE_URL
)
d_map = generator.gen_subdivisions()

In [8]:
for s_id, data in d_map:
    break

print(f"subdivision id: {s_id}")
print("Data:")
data

subdivision id: 0
Data:


Unnamed: 0_level_0,start_date,area_burnt_ha
division_id,Unnamed: 1_level_1,Unnamed: 2_level_1
0,1984-08-05,2872.493135
0,1985-06-06,462.123006
0,1985-06-21,1074.188447
0,1985-06-21,1823.131117
0,1985-06-23,2392.212324
...,...,...
0,1984-06-25,392.451179
0,1984-06-25,411.862271
0,1984-06-25,420.630552
0,1984-08-05,149.720018
