In [None]:
import pandas as pd
import re
import pandera as pa
from pandera.typing import DataFrame
from typing import Annotated
import os
from collections import Counter
import numpy as np
from datetime import datetime
import pytz

In [None]:
df_websirenes = pd.read_parquet('websirenes_stations_original.parquet')
df_websirenes.estacao = df_websirenes.estacao.str.strip()

estacao_desc = df_websirenes['estacao_desc']
estacao_desc = estacao_desc.str.strip().str.lower().str.replace(r'\s+', '_', regex=True).replace(r'/', '.', regex=True)
df_websirenes['estacao_desc'] = estacao_desc

df_websirenes.to_parquet('websirenes_stations.parquet', index=False)
df_websirenes.head()


In [None]:
df_websirenes['estacao_desc'].replace('la', '.', regex=False).unique()

In [None]:
station_id = "urca"
df = pd.read_parquet("../../data/ws/alertario/rain_gauge/" + station_id + ".parquet")
df.head()

In [None]:
def show_timezones_name():
    print(pytz.all_timezones)

def get_UTC_offset_from_timezone_name(timezone_name: str) -> str:
    return datetime.now(pytz.timezone(timezone_name)).strftime('%z')

class WebSireneSchema(pa.DataFrameModel):
    horaLeitura: pa.typing.Index[Annotated[
        pd.DatetimeTZDtype, "ns", f"UTC{get_UTC_offset_from_timezone_name('America/Sao_Paulo')}"]
    ]
    nome: str
    m15: float = pa.Field(nullable=True)
    m30: float = pa.Field(nullable=True)
    h01: float = pa.Field(nullable=True)
    h02: float = pa.Field(nullable=True)
    h03: float = pa.Field(nullable=True)
    h04: float = pa.Field(nullable=True)
    h24: float = pa.Field(nullable=True)
    h96: float = pa.Field(nullable=True)
    station_id: int

class WebSirenesParser:
    def list_files(self) -> list[str]:
        return os.listdir('websirenes_defesa_civil')
    
    def _get_name_pattern(self) -> str:
        return r'^(?P<name>.+?)(?=\s+\d{4}-\d{2}-\d{2})'
    
    def _get_date_pattern(self) -> str:
        return r'(?P<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}-\d{2})'
    
    def _get_timeframe_pattern(self) -> str:
        return r"(?P<timeframe>.+)$"
    
    def _get_complete_pattern(self) -> str:
        return rf"{self._get_name_pattern()}\s+{self._get_date_pattern()}\s+{self._get_timeframe_pattern()}"

    def _extract_features(self, line: str) -> tuple:
        complete_pattern = self._get_complete_pattern()

        match = re.search(complete_pattern, line)
        
        name = match.group('name')
        date = match.group('date') + '00'
        timeframe = match.group('timeframe')

        m15, m30, h01, h02, h03, h04, h24, h96, station_id = [
            np.nan if x == 'null' else float(x.replace(',', '.')) if "," in x else float(x)
            for x in timeframe.strip().split()
        ]

        return (
            name, 
            datetime.strptime(date, '%Y-%m-%d %H:%M:%S%z'), 
            m15, m30, h01, h02, h03, h04, h24, h96, 
            int(station_id)
        )

    def _parse_txt_file(self, file_path: str) -> tuple[list[str], list[tuple]]:
        file_data: list[tuple] = []
        with open(file_path, 'r', encoding='utf-8-sig') as file:
            header = file.readline().strip().split()
            for line in file:
                file_data.append(self._extract_features(line))
        return header, file_data
    
    def read_station_name_id_txt_file(self, file_path: str) -> tuple[str, int]:
        with open(file_path, 'r', encoding='utf-8-sig') as file:
            header = file.readline().strip().split()
            nome, horaLeitura, m15, m30, h01, h02, h03, h04, h24, h96, station_id = self._extract_features(
                file.readline()
            )
            return nome, station_id
        
    def get_time_resolution(self, dates: pd.Series)-> dict:
        return dict(Counter([dates[i] - dates[i - 1] for i in range(1, len(dates))]))
    
    def get_dataframe(self, file_path: str) -> DataFrame[WebSireneSchema]:
        header, file_data = self._parse_txt_file(file_path)
        df = pd.DataFrame(file_data, columns=header)
        df.rename(columns={'id': 'station_id'}, inplace=True)
        df.set_index('horaLeitura', inplace=True)
        validated_df = WebSireneSchema.validate(df)
        return df
    
    def assert_is_sorted_by_date(self, df: DataFrame[WebSireneSchema]) -> bool:
        assert df.index.is_monotonic_increasing, 'DataFrame index is not sorted by date'

websirenes_parser = WebSirenesParser()

for file in websirenes_parser.list_files():
    df = websirenes_parser.get_dataframe(os.path.join('websirenes_defesa_civil', file))
    websirenes_parser.assert_is_sorted_by_date(df)
    print(f'Data de início de operação: {df.index[0]}')
    break
print(df.shape)
df.head()

In [None]:
df['m15'].isnull().values.any()

In [None]:
import pandas as pd

df = pd.DataFrame({
    'names': ['my name', 'my     name', '     my  name 2   / name 3 ']
})

names = df['names']
cleaned_names = names.str.strip().str.lower().str.replace(r'\s+', '_', regex=True).replace(r'/', '.', regex=True)

print(cleaned_names)


In [None]:
import unicodedata

def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    return ''.join([c for c in nfkd_form if not unicodedata.combining(c)])

df_websirenes_estacao_desc_unique = df_websirenes['estacao_desc'].unique()

def produce_hourly_data(df: pd.DataFrame) -> pd.DataFrame:
    estacao_desc = df['nome']
    estacao_desc = estacao_desc.str.strip()
    estacao_desc = estacao_desc.str.lower()
    estacao_desc = estacao_desc.apply(remove_accents)
    estacao_desc = estacao_desc.str.replace(r'\s+', '_', regex=True)
    estacao_desc = estacao_desc.replace(r'/', '.', regex=True)
    
    df['estacao_desc'] = estacao_desc
    stacao_desc_name = df['estacao_desc'].iloc[0]

    if stacao_desc_name not in df_websirenes_estacao_desc_unique:
        error_color = '\033[91m'
        reset_color = '\033[0m'
        print(f"{error_color}Station {stacao_desc_name} not found in websirenes_stations.parquet{reset_color}")
        return None

    df.reset_index(inplace=True)
    df.rename(columns={
        'horaLeitura': 'datetime',
        'm15': 'precipitation_sum'
    }, inplace=True)

    hourly_df = df.loc[:, ['datetime', 'estacao_desc', 'precipitation_sum']]
    hourly_df['datetime'] = pd.to_datetime(hourly_df['datetime'])
    hourly_df['precipitation_sum'] = hourly_df['precipitation_sum'].ffill(limit_area="inside", limit=4)
    hourly_df['precipitation_sum'] = hourly_df['precipitation_sum'].bfill(limit_area="inside", limit=4)
    hourly_df = hourly_df[hourly_df.datetime.dt.minute == 0]

    if hourly_df['precipitation_sum'].isnull().values.any().any():
        print(f"Sirene Station {stacao_desc_name} has missing precipitation {(hourly_df.isnull().mean() * 100).mean()}")
    return hourly_df

for i, file in enumerate(websirenes_parser.list_files()):
    print(f"Processing {i}/{len(websirenes_parser.list_files())}")
    df = websirenes_parser.get_dataframe(os.path.join('websirenes_defesa_civil', file))
    websirenes_parser.assert_is_sorted_by_date(df)
    df = produce_hourly_data(df)
    if df is None: continue
    print(f'Data de início de operação: {df.datetime.min()}')
    station_name = df['estacao_desc'].iloc[0]
    df.to_parquet(f"{station_name}.parquet")
print(df.shape)
df.head()