# Data preprocessing

This Jupyter notebook contains a huge part of the preprocessing steps to read the dataset available in Rio. This Jupyter Notebook is used to generate usable CSV files from the AlertaRio's dataset.

In [None]:
import os
import json
import pandas as pd
import re
import numpy as np

import subprocess

from time import time
from threading import Thread
from threading import Lock

## Unzipping archive

The archives must be under data/archive and must not be renamed, the archive can be downloaded on the AlertaRio's [download page](http://alertario.rio.rj.gov.br/download/).

In [None]:
os.system("bash extractor.bash")

## Preparation

Preparing the CSV file creation step by creating some regular expression to manage filepath and moving in the directory that contains the data to extract.

In [None]:
os.chdir("data/input")

In [None]:
pattern_path = re.compile(r'(\w+$|(.+(\/\/|\/))+)|(.+)')

def get_path(path):
    return pattern_path.sub(r'\1', path)

def get_file(path):
    return pattern_path.sub(r'\4', path)

def create_path(path):
    safe_path = get_path(path)
    if not os.path.exists(safe_path):
        os.makedirs(safe_path)

print(get_path("SIMCosta_fsedsdfBoia"))
print(get_path("SIMCosta_fsedsdfBoia//"))
print(get_path("SIMCozfqzfrhyksta_Boia//SIsqdqzdMCosta_Boia//SIMCosta_fsedsdfBoia//"))
print(get_path("SIMCozfqzfrhyksta_Boia//SIsqdqzdMCosta_Boia//SIMCosta_fsedsdfBoia//SIMCOSTA_RJ-1_MET_2015-07-29_2016-10-13.csv"))

print(get_path("SIMCosta_fsedsdfBoia"))
print(get_path("SIMCosta_fsedsdfBoia/"))
print(get_path("SIMCozfqzfrhyksta_Boia/SIsqdqzdMCosta_Boia/SIMCosta_fsedsdfBoia/"))
print(get_path("SIMCozfqzfrhyksta_Boia/SIsqdqzdMCosta_Boia/SIMCosta_fsedsdfBoia/SIMCOSTA_RJ-1_MET_2015-07-29_2016-10-13.csv"))


print(get_file("SIMCosta_fsedsdfBoia"))
print(get_file("SIMCosta_fsedsdfBoia//"))
print(get_file("SIMCozfqzfrhyksta_Boia//SIsqdqzdMCosta_Boia//SIMCosta_fsedsdfBoia//"))
print(get_file("SIMCozfqzfrhyksta_Boia//SIsqdqzdMCosta_Boia//SIMCosta_fsedsdfBoia//SIMCOSTA_RJ-1_MET_2015-07-29_2016-10-13.csv"))

print(get_file("SIMCosta_fsedsdfBoia"))
print(get_file("SIMCosta_fsedsdfBoia/"))
print(get_file("SIMCozfqzfrhyksta_Boia/SIsqdqzdMCosta_Boia/SIMCosta_fsedsdfBoia/"))
print(get_file("SIMCozfqzfrhyksta_Boia/SIsqdqzdMCosta_Boia/SIMCosta_fsedsdfBoia/SIMCOSTA_RJ-1_MET_2015-07-29_2016-10-13.csv"))

## Pluviometric/rainfall station

Checking if there is data to extract in the pluviometric station.

List of the rainfall station currently in the data/input folder.

In [None]:
pluv_station = !ls AlertaRio_DadosPluv | sort | sed "s/\([A-Za-z\_]\+\)\_[0-9]\+\_Plv\.txt/\1/g" | uniq | sed "/zip$/d"
len(pluv_station), pluv_station

Running the CSV generator on every rainfall station using multithreading.

In [None]:
h = pd.Timedelta("1h")
threaded = True

def plv_station_generator(station, l):
    l.acquire()
    print("Starting : ", station)
    l.release()

    subprocess.run(rf"bash ../../csv-generator.bash AlertaRio_DadosPluv/{station}", shell=True)
    
    l.acquire()
    print("Finishing : ", station)
    l.release()

timeS = time()
lock = Lock()
print("Starting")

if threaded:
    """Thread version"""
    threads = []
    for station in pluv_station[:]:
        t = Thread(target=plv_station_generator, args=(station, lock))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
else:
    """Iterable version"""
    for station in pluv_station:
        plv_station_generator(station, lock)

print(time() - timeS)
print("Complete")

## Meteorological station

Checking if there is data to extract in the meteorological station.

List of the meteorological station currently in the data/input folder.

In [None]:
met_station = !ls AlertaRio_DadosMet | sort | sed "s/\([A-Za-z\_]\+\)\_[0-9]\+\_Met\.txt/\1/g" | uniq | sed "/zip$/d"
len(met_station), met_station

Running the CSV generator on every rainfall station using multithreading.

In [None]:
h = pd.Timedelta("1h")
threaded = True

def met_station_generator(station, l):
    l.acquire()
    print("Starting : ", station)
    l.release()

    subprocess.run(rf"bash ../../csv-generator.bash AlertaRio_DadosMet/{station}", shell=True)
    
    l.acquire()
    print("Finishing : ", station)
    l.release()

timeS = time()
lock = Lock()
print("Starting")

if threaded:
    """Thread version"""
    threads = []
    for station in met_station:
        t = Thread(target=met_station_generator, args=(station, lock))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
else:
    """Iterable version"""
    for station in met_station:
        met_station_generator(station, lock)
        
print(time() - timeS)
print("Complete")