In [None]:
import re
import requests
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup as soup
from typing import List, Union, Dict

In [None]:
BASE_URL_DST_FINAL = 'https://wdc.kugi.kyoto-u.ac.jp/dst_final/'

In [None]:
def get_month_data(year: str, month: str) -> Union[str, Exception]:
    """
    year [str]: YYYYY
    month [str]: DD
    """
    url = f"{BASE_URL_DST_FINAL}/{year}{month}/index.html"
    try:
        ans = requests.get(url=url)
        data = soup(ans.text, "html.parser")
        text_from_html = data.findAll("pre")[0].text
        return text_from_html
    except Exception as error:
        raise error

In [None]:
def clean_month_data_text(data_text: str) -> List[str]:
    """
    data_text [str]
    """
    raw_data = list()
    _ = [raw_data.append(i) for i in data_text.split('\n') if i != '']
    return raw_data[6:]


def clean_single_line(line: str) -> List[int]:
    """
    line [str]
    """
    # seleciona somente os valores de dst dentro da lista
    # quebra o texto em 3 blocos com 33 caracteres
    split_three_blocks = re.findall('.................................', line[2:])
    # remove o primeiro caracter de cada bloco
    clean_blocks = list()
    _ = [clean_blocks.append(i[1:]) for i in split_three_blocks]
    # separa os blocos em conjuntos de 4 caracteres
    # converte valores de string para inteiro
    separated_values = [int(i) for i in re.findall('....', ''.join(clean_blocks))]
    return separated_values


def clean_multiples_lines(lines: List[str]):
    lines_ok = list()
    _ = [lines_ok.append(clean_single_line(i)) for i in lines]
    return lines_ok

In [None]:
def generate_df(trusted_data: List[List[int]], year: str, month: str) -> pd.DataFrame:
    """
    """
    # Corrige data: 24h00 -> 00h00
    for i, day_indexex in enumerate(trusted_data):
        trusted_data[i].insert(0, day_indexex.pop())
   
    df = pd.DataFrame(trusted_data, columns=list(range(0, 24)))
    df.index = df.index+1
    df['date'] = [pd.to_datetime(f"{year}-{month}-{day}", format='%Y-%m-%d') for day in df.index]
    df['dst_min'] = [df.loc[i, np.array(range(0, 24))].min() for i in range(1, len(df)+1)]
    return df

In [None]:
def make_classification(df: pd.DataFrame, classification_rules: Dict[str, List[int]]):
    """
    df [DataFrame]
    classification_rules [Dict[str, List[int]]]
        example:
            {
                'fraca':            np.array(range(-31, -51, -1)),
                'moderada':         np.array(range(-51, -101, -1)),
                'intensa':          np.array(range(-101, -251, -1)),
                'super_intensa':    np.array(range(-251, -1001, -1)),
            }
    """
    df['classification'] = np.nan
    for i in range(1, len(df)+1):
        for category, index_range in classification_rules.items(): 
            if df.loc[i, 'dst_min'] in index_range:
                df.loc[i, 'classification'] = category
                break

In [None]:
def remove_storms_by_date(df: pd.DataFrame, dates: List[str]):
    """
    df: DataFrame
        columns: 
            date pd.datetime
    dates: list[str]
        format: YYYY-MM-DD
    """
    format_dates = [pd.to_datetime(date, format='%Y-%m-%d') for date in dates]
    boolean_mask = [date not in format_dates for date in df['date']]
    final_mask = pd.Series(boolean_mask, name='date', index=list(range(1, len(df)+1)))
    filtered_df = df[final_mask]
    filtered_df.reset_index(drop=True, inplace=True)
    return filtered_df

In [None]:
def save_processed_data(df, year, month):
    processed_data = pd.HDFStore(path=f'processed_data/{year}{month}.h5')
    processed_data.append('df', df)
    processed_data.close()

In [None]:
def plot_dst_graph(df):
    # alta resolução
    axis_x = [(i+1)/df.columns.size for i in range(df.columns.size*len(df))]

    elements = [df.iloc[i].to_list() for i in range(len(df))]
    axis_y = list()
    for element in elements:
        axis_y += element

    plt.plot(axis_x, axis_y)

    # baixa resolução
    # df.min(axis=1).plot()

In [None]:
_ = \
"""
[Fraca]          -30 nT > Dst >=  -50 nT
[Moderada]       -50 nT > Dst >= -100 nT
[Intensa]       -100 nT > Dst >= -250 nT
[SuperIntensa]  -250 nT > Dst 
"""

classification_rules = {
    'fraca':            np.array(range(-31, -51, -1)),
    'moderada':         np.array(range(-51, -101, -1)),
    'intensa':          np.array(range(-101, -251, -1)),
    'super_intensa':    np.array(range(-251, -1001, -1)),
}

In [None]:
year = '2003'
month = '11'

month_data = get_month_data(year=year, month=month)
cleaned_data = clean_month_data_text(data_text=month_data)
trusted_data = clean_multiples_lines(lines=cleaned_data)


In [None]:
df = generate_df(trusted_data=trusted_data, year=year, month=month)
make_classification(df, classification_rules)        

In [None]:
storms_to_remove_by_date = [
    '2000-07-01',
    '2000-07-02',
    '2000-07-03',
    '2000-07-29',
    '2000-07-30'
]
df_filtered = remove_storms_by_date(df, storms_to_remove_by_date).dropna()

In [None]:
df_filtered.dropna()

In [None]:
def class_count(df: pd.DataFrame):
    count = {"fraca": 0, "moderada": 0, "intensa": 0, "super_intensa": 0}
    for i in range(1, len(df)):
        count[df['classification'].iloc[i]]+=1
    print(count)
class_count(df_filtered)

In [None]:
def accuracy_range(results):
    mean = results.mean()
    dv = results.std()
    print('Acurácia média: {:.2f}%'.format(mean*100))
    print('Intervalo de acurácia: [{:.2f}% ~ {:.2f}%]'.format((mean - 2*dv)*100, (mean + 2*dv)*100))

In [None]:
def intervalo_prec(results):
  for i in range(1, len(results)):
    mean = results.mean()
    dv = results.std()
    print('Precisão média: {:.2f}%'.format(mean*100))
    print('Intervalo de Precisão: [{:.2f}% ~ {:.2f}%]'.format((mean - 2*dv)*100, (mean + 2*dv)*100))

In [None]:
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder


le = LabelEncoder()
df_filtered['classification'] = le.fit_transform(df_filtered['classification'])

# divisao entre treino e teste do dataframe original
X = df_filtered['dst_min'].values.reshape(-1, 1) 
y = df_filtered['classification']

SEED = 10
np.random.seed(SEED)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.30, stratify = y)
print('O dataset de treino possui {} tempestades e o de teste {} tempestades.'.format(X_train.shape[0], X_test.shape[0]))


cv = StratifiedKFold(n_splits=5, shuffle=True)
model = GaussianNB()
accuracy = cross_val_score(model, X_train, y_train, cv = cv, scoring='accuracy')
#precision = cross_val_score(model, X_train, y_train, cv = cv, scoring='precision')

accuracy_range(accuracy)
#precision_range(precision)
