# Transform BSEC Weather Station Data into Parquet Format

In [None]:
# need to have the following libraries installed into this notebook's kernel

In [None]:
import geopandas as gpd
import numpy as np
import pandas as pd
from glob import glob
import pyarrow as pa
import rioxarray as rio
from shapely import Point
import xarray as xr

In [None]:
# download the raw data from https://data.msdlive.org/records/6yawb-zyx60
# and set the path here
root_path_to_raw_data = '.'

In [None]:
# rename columns of interest to snake case
rename_map = {
    'Station ID': 'station_id',
    'Site Name': 'site_name',
    'Address': 'address',
    'Site Type': 'site_type',
    'Station Type': 'station_type',
    'Longitude': 'longitude',
    'Latitude': 'latitude',
    'Start Date': 'start_date',
}

In [None]:
# collect all the weather files
weather_files = sorted(glob(f'{root_path_to_raw_data}/data/5min/**/*.csv'))

In [None]:
# aggregate all the weather data by station into a gigantic dataframe
agg = []
for f in weather_files:
    ds = pd.read_csv(f)
    ds['stationID'] = '_'.join(f.split('-')[1].split('_')[:2])
    agg.append(ds)
agg = pd.concat(agg, ignore_index=True)
agg = agg.drop(columns=['tz', 'obsTimeLocal', 'epoch']).set_index([
    'stationID', 'obsTimeUtc', 'lat', 'lon'
]).dropna(
    axis=0,
    how='all',
).reset_index()
agg['obsTimeUtc'] = pd.to_datetime(agg.obsTimeUtc, format='mixed', utc=True)

In [None]:
# load the station locations
stations = pd.read_csv(f'{root_path_to_raw_data}/documents/Stations-Locations.csv')
stations = gpd.GeoDataFrame(
    stations, geometry=gpd.points_from_xy(stations.Longitude, stations.Latitude)
)
stations = stations.rename(columns=rename_map)[[*rename_map.values(), 'geometry']]
stations['station_id'] = stations['station_id'] + '_' + stations['station_type']

# use this to move stations a little that are stacked on top of each other
def jitter(row, sigma=0.0001):
    if stations.geometry.value_counts().loc[row.geometry] > 1:
        row['geometry'] = Point(row.geometry.x + np.random.normal(0, sigma), row.geometry.y + np.random.normal(0, sigma))
    return row

# subselect stations that actually have data
stations = stations[(~stations.is_empty) & (stations.station_id.isin(agg.stationID.unique()))].apply(jitter, axis=1).set_crs('epsg:4326')

# write the stations to geojson
stations.to_file('./static/data/station_locations.json', driver="GeoJSON")

In [None]:
# write the weather data to parquet
for g in agg.groupby('stationID'):
    fname = f"./static/data/{g[0]}_5min_data.parquet"
    g[1].to_parquet(fname, index=False)
