## Setup

In [None]:
import io
import os
import re
from typing import Sequence
from src import util

from dotenv import load_dotenv

import numpy as np
import pandas as pd
import warnings
from pandas.errors import ParserWarning
pd.set_option('future.no_silent_downcasting', True)

In [None]:
load_dotenv()

data_path = os.getenv("EVENTS_PATH")
csv_path = os.getenv("EVENTS_CSV_PATH")
data = {}

begin_year = 1996
end_year = 2024
events_range = range(begin_year, end_year + 1)

## Read Files

### DSD Functions

In [None]:
def find_data_row_dsd(file_lines: list[str], year_) -> int | None :
    for i, line in enumerate(file_lines):
        if year_ >= 2001:
            if  "#------------------------" in line.strip():
                return i + 1
        else:
            if 'Date   10.7cm  Number   Hemis. Regions Field   Flux   C  M  X'  in line.strip():
                return i + 1

    return None

In [None]:
def read_dsd(path_, year_) -> pd.DataFrame:
    col_names = ['year', 'month', 'day', 'radio_flux_10.7cm', 'sunspot_number',
                'sunspot_area', 'new_regions', 'mean_solar_field', 'goes_xray_bkgd_flux',
                'flares_c', 'flares_m', 'flares_x', 'flares_optical_s', 'flares_optical_1',
                'flares_optical_2', 'flares_optical_3']

    col_specs = [(0, 4), (5, 7), (8, 10), (11, 18), (19, 25), (26, 33), (34, 40),
                (41, 47), (48, 54), (55, 58), (59, 61), (62, 64), (65, 68),
                (69, 71), (72, 74), (75, 77)]

    if year_ <= 1996:
        col_names[0] = 'day'
        col_names[2] = 'year'

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", ParserWarning)

        with open(path_, 'r', errors='ignore') as f:
            all_lines = f.readlines()

        data_row = find_data_row_dsd(all_lines, year_)
        if data_row is None:
            print(f"AVISO: Cabeçalho DSD não encontrado em {path_}. Pulando arquivo.")
            return pd.DataFrame(columns=col_names)

        data_list = all_lines[data_row:]
        data_str = "".join(data_list)
        data_str_buffer = io.StringIO(data_str)

        return pd.read_fwf(data_str_buffer,
                           names=col_names,
                           na_values=[-999, '*'],
                           col_specs=col_specs,
                           index_col=False)

In [None]:
def format_dsd(df_) -> pd.DataFrame:
    df_ = df_.copy()

    month_map = {'jan':1, 'feb':2, 'mar':3, 'apr':4, 'may':5, 'jun':6, 'jul':7, 'aug':8, 'sep':9, 'oct':10, 'nov':11, 'dec':12}
    cleaned_month = df_['month'].astype(str).str.strip().str.lower()
    cleaned_month = cleaned_month.replace(month_map)
    numeric_month = pd.to_numeric(cleaned_month, errors='coerce')

    numeric_year = pd.to_numeric(df_['year'], errors='coerce')
    corrected_year = numeric_year.apply(
        lambda y_: y_ + 1900 if y_ < 100 else y_
    )

    df_['ds'] = pd.to_datetime({'year': corrected_year,
                                    'month': numeric_month,
                                    'day': df_['day']
                                })

    df_ = df_.set_index('ds')
    df_ = df_.drop(columns=['month', 'day', 'year'])
    return df_

### Events Functions

In [None]:
def find_data_row_events(file_lines_, date_: pd.Timestamp) -> int | None:
    for i, line in enumerate(file_lines_):
        if date_ <= pd.to_datetime('1998-05-08'):
            if  "Reg#" in line.strip():
                return i + 1
        else:
            if "#----------------------------------------------------------" in line.strip():
                return i + 1

    return None

In [None]:
def find_event_date(file_lines, date_: pd.Timestamp) -> pd.Timestamp | None:
    for i, line in enumerate(file_lines):
        if date_ <= pd.to_datetime('1998-05-08'):
            if f"EDITED EVENTS for {date_.year}" in line.strip():
                date_str = line.strip()[17:]
                return pd.to_datetime(date_str)
        else:
            if f":Date: {date_.year}" in line.strip():
                date_str = line.strip()[6:]
                return pd.to_datetime(date_str)

    return None

In [None]:
def read_events_lines(path_, date_: pd.Timestamp) -> pd.DataFrame:
    with open(path_, 'r', errors='ignore') as f:
        file_lines = f.readlines()

    data_row = find_data_row_events(file_lines, date_)
    if data_row is None:
        print(f"AVISO: Cabeçalho de Eventos não encontrado em {path_}")
        return pd.DataFrame({'raw_line': []})

    data_lines = [line.rstrip('\n') for line in file_lines[data_row:]]

    df = pd.DataFrame(data_lines, columns=['raw_line'])
    df = df[~df['raw_line'].str.contains("NO EVENT REPORTS", na=False)].copy()
    df = df[df['raw_line'].str.strip() != ''].copy()

    df['date'] = find_event_date(file_lines, date_)

    return df

In [None]:
def create_timestamps(df_: pd.DataFrame, column_name_: str) -> pd.Series:
    date_str_series = df_['date'].dt.strftime('%Y-%m-%d')
    time_str_series = df_[column_name_]
    full_datetime_str = date_str_series + ' ' + time_str_series

    return pd.to_datetime(full_datetime_str, format='%Y-%m-%d %H%M', errors='coerce')

In [None]:
def nullify_invalid_time_patterns(df_: pd.DataFrame, columns_names_: Sequence[str], pattern_: re.Pattern[str]) -> pd.DataFrame:
    df_ = df_.copy()

    for column_name in columns_names_:
        mask = df_[column_name].str.match(pattern_, na=False)
        to_nullify = (~mask)
        df_.loc[to_nullify, column_name] = np.nan

    return df_

In [None]:
def format_events(df_raw: pd.DataFrame) -> pd.DataFrame:
    final_cols = ['date','event', 'begin', 'max', 'end', 'obs', 'q', 'type', 'loc_frq', 'particulars', 'reg#']

    if df_raw.empty:
        return pd.DataFrame(columns=final_cols)

    df = pd.DataFrame()

    na_values = ['','////']
    df = df.replace(na_values, np.nan)

    df['date'] = df_raw['date']
    df['event_num'] = df_raw['raw_line'].str.slice(0, 5).str.strip()
    df['event_plus'] = df_raw['raw_line'].str.slice(5, 11).str.strip()
    df['begin'] = df_raw['raw_line'].str.slice(11, 18).str.strip()
    df['max'] = df_raw['raw_line'].str.slice(18, 28).str.strip()
    df['end'] = df_raw['raw_line'].str.slice(28, 34).str.strip()
    df['obs'] = df_raw['raw_line'].str.slice(34, 39).str.strip()
    df['q'] = df_raw['raw_line'].str.slice(39, 43).str.strip()
    df['type'] = df_raw['raw_line'].str.slice(43, 48).str.strip()
    df['loc_frq'] = df_raw['raw_line'].str.slice(48, 58).str.strip()
    df['particulars'] = df_raw['raw_line'].str.slice(58, 76).str.strip()
    df['reg#'] = df_raw['raw_line'].str.slice(76).str.strip()

    df['event_plus'] = df['event_plus'].fillna('')
    df['event'] = (df['event_num'] + df['event_plus']).str.replace(r'[ABU]','',regex=True).str.strip()

    time_columns = ('begin', 'max', 'end')
    pattern = re.compile(r'^\d{4}$')
    df = nullify_invalid_time_patterns(df, time_columns, pattern)

    for column_name in time_columns:
        df[column_name] = create_timestamps(df, column_name)
    df['max'] = np.where(df['max'] < df['begin'], df['max']+pd.Timedelta(days=1), df['max'])
    df['end'] = np.where(df['end'] < df['begin'], df['end']+pd.Timedelta(days=1), df['end'])

    df = df.replace(na_values, np.nan)

    return df[final_cols]

### Main

In [None]:
for y in range(begin_year, end_year+1):
    data[y] = {}

    year_dir = os.path.join(data_path,f"{y}")
    if not os.path.isdir(year_dir):
        print(f"ERRO: Diretório não encontrado, pulando ano {y}")
        continue

    dsd_file_name = f"{y}_DSD.txt"
    dsd_file_path = os.path.join(year_dir,dsd_file_name)

    df_day = read_dsd(dsd_file_path, y)
    df_day = format_dsd(df_day)

    data[y]['DSD'] = df_day
    print(f"success reading {y} DSD")

    events_dir = os.path.join(year_dir,f"{y}_events")
    if not os.path.isdir(events_dir):
        print(f"ERRO: Diretório não encontrado, pulando ano {y}")
        continue

    df_events_list = []
    files_in_dir = set(os.listdir(events_dir))

    for date in pd.date_range(f"{y}-01-01", f"{y}-12-31"):
        m_str = date.strftime("%m")
        d_str = date.strftime("%d")
        file_name = f"{y}{m_str}{d_str}events.txt"


        if file_name not in files_in_dir:
            if date >= pd.to_datetime("1996-07-31"):
                print(f"AVISO : Arquivo não encontrado, pulando {date}")
            continue

        full_path = os.path.join(events_dir, file_name)
        df_day = read_events_lines(full_path, date)
        if df_day is not None and not df_day.empty:
            df_events_list.append(df_day)

    df_events = pd.concat(df_events_list, ignore_index=True)
    df_events = format_events(df_events)
    data[y]['events'] = df_events
    print(f"Success reading {y} events")

In [None]:
data[1996]['DSD']

### Exporting CSVs

In [None]:
util.create_dirs(csv_path,events_range)

In [None]:
for y in range(begin_year, end_year+1):
    df_dsd = data[y]['DSD']
    df_events = data[y]['events']

    year_dir = os.path.join(csv_path,str(y))
    df_dsd.to_csv(os.path.join(year_dir,f"{y}_DSD.csv"))
    df_events.to_csv(os.path.join(year_dir,f"{y}_events.csv"), index=False)