In [1]:
import pandavro as pdx
import pandas as pd
from pathlib import Path
from typing import *
import dateutil
import io
import os
from tqdm.notebook import tqdm

In [10]:
idx = pd.IndexSlice

In [2]:
files_path = Path.home() / "working_dir/inmet/"

In [3]:
def parse_date(string):
    return dateutil.parser.parse(string, dayfirst=True)


def clean_metadata(metadata_text: str) -> dict:
    metadata = {}
    metadata_lines = metadata_text.split("\n")
    metadata['station_name'] = (metadata_lines[0].split(":")[1]
                                                 .split("(OMM")[0]
                                                 .strip())
    metadata['municipality'] = (metadata['station_name'].split('-')[0]
                                                       .strip())
    metadata['state'] = (metadata['station_name'].split('-')[1]
                                                 .strip())
    metadata['latitude'] = (metadata_lines[1].split(":")[1]
                                             .strip())
    metadata['longitude'] = (metadata_lines[2].split(":")[1]
                                              .strip())
    metadata['altitude'] = (metadata_lines[3].split(":")[1]
                                             .strip())
    metadata['status'] = metadata_lines[4]
    metadata['operation_start'] = str(parse_date(metadata_lines[5].split(":")[1]
                                                              .strip()))
    return metadata

    
def clean_data(data_text: str) -> pd.DataFrame:
    buffer = io.StringIO(data_text)    
    cols = ['station_id',
            'date',
            'hour',
            'temperature_dry',
            'temperature_wet',
            'humidity_relative',
            'air_pressure',
            'wind_direction',
            'wind_speed']
    get_minutes = lambda s: float(s[2:])
    get_hours = lambda s: float(s[0:2])
    data = (pd.read_csv(buffer, delimiter=';', names=cols, index_col=False).loc[1:]
              .assign(date=lambda df: df['date'].map(parse_date))
              .assign(minutes=lambda df: pd.to_timedelta(df['hour'].map(get_minutes), unit='m'))
              .assign(hour=lambda df: pd.to_timedelta(df['hour'].map(get_hours), unit='h'))
              .assign(timestamp=lambda df: df['date'] + df['hour'] + df['minutes'])
              .drop(['date', 'hour', 'minutes'], axis='columns')
              .set_index(['station_id', 'timestamp'])
              .assign(temperature_dry=lambda df: pd.to_numeric(df['temperature_dry']))
              .assign(temperature_wet=lambda df: pd.to_numeric(df['temperature_wet']))
              .assign(humidity_relative=lambda df: pd.to_numeric(df['humidity_relative']))
              .assign(air_pressure=lambda df: pd.to_numeric(df['air_pressure']))
              .assign(wind_direction=lambda df: pd.to_numeric(df['wind_direction']) * 10)
              .assign(wind_speed=lambda df: pd.to_numeric(df['wind_speed']))
           )
    return data


def clean_data_file(text: str) -> Tuple[pd.DataFrame, dict]:
    splitted_text = text.split('--------------------')
    metadata_text = splitted_text[2].strip()
    metadata = clean_metadata(metadata_text)
    data_text = splitted_text[-1].strip()
    data = clean_data(data_text)
    return (data, metadata)

In [4]:
files_list = os.listdir(files_path)
files_list = [file for file in files_list if file[-4:] == '.txt']
stations_data = {}
stations_metadata = {}
for file in tqdm(files_list):
    station_id = file.split('.')[0]
    filepath = files_path / file
    with open(filepath, 'r') as fid:
        text = fid.read()
    (data, metadata) = clean_data_file(text)
    stations_data[station_id] = data
    stations_metadata[station_id] = metadata
stations_data = pd.concat(list(stations_data.values()))
stations_metadata = pd.DataFrame(stations_metadata).T.rename_axis('station_id')
print('done')

HBox(children=(FloatProgress(value=0.0, max=234.0), HTML(value='')))


done


In [5]:
stations_data.to_csv('data/measurements.csv')
stations_metadata.to_csv('data/stations.csv')

In [None]:
pdx.to_avro('data/measurements.avro', stations_data)