# Writing ISD data to partitioned parquet

You'll need to download a year of ISD data, e.g. from Microsoft's Planetary Computer.
We're using https://github.com/gadomski/pyisd to read the data.

In [40]:
import dask
import dask.dataframe
import dask_geopandas
import pandas
import os
import isd
import isd.pandas

def read_to_data_frame(path):
    with isd.open(path) as iterator:
        records = list(iterator)
    data_frame = isd.pandas.data_frame(records)
    timestamp = pandas.to_datetime(data_frame[["year", "month", "day", "hour"]])
    data_frame["timestamp"] = timestamp
    data_frame.set_index("timestamp")
    return data_frame

data_frames = []
for file_name in os.listdir("isd/2020"):
    path = os.path.join("isd/2020", file_name)
    data_frames.append(dask.delayed(read_to_data_frame)(path))
    if len(data_frames) > 5:
        break

data_frame = dask.dataframe.from_delayed(data_frames)

ValueError: ('Partitions must be sorted ascending with the index', [0, 0, 0, 0, 0, 0], [2753, 11052, 26167, 8689, 12264, 8544])

In [56]:
import dask
import dask.dataframe
import dask_geopandas
import pandas
import os
import isd
import isd.pandas

def read_to_data_frame(path):
    with isd.open(path) as iterator:
        records = list(iterator)
    data_frame = isd.pandas.data_frame(records)
    timestamp = pandas.to_datetime(data_frame[["year", "month", "day", "hour", "minute"]])
    data_frame["timestamp"] = timestamp
    data_frame.set_index("timestamp")
    return data_frame

data_frames = []
for file_name in os.listdir("isd/2020"):
    path = os.path.join("isd/2020", file_name)
    data_frames.append(dask.delayed(read_to_data_frame)(path))
    if len(data_frames) == 10:
        break

data_frame = dask.dataframe.from_delayed(data_frames)
data_frame = data_frame.set_index("timestamp")
data_frame = data_frame.repartition(freq="7d")
data_frame.head()

Unnamed: 0_level_0,usaf_id,ncei_id,year,month,day,hour,minute,data_source,latitude,longitude,...,air_temperature,air_temperature_quality_code,dew_point_temperature,dew_point_temperature_quality_code,sea_level_pressure,sea_level_pressure_quality_code,additional_data,remarks,element_quality_data,original_observation_data
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2020-01-01,266661,99999,2020,1,1,0,0,4,53.6,24.05,...,1.9,1,-2.1,1,1019.1,1,GA1071+009001061GE19MSL +99999+99999GF107991...,SYN08626825 42560 73110 10019 21021 30005 4019...,,
2020-01-01,67000,99999,2020,1,1,0,0,4,46.25,6.133,...,2.1,1,1.1,1,1035.0,1,AA101000091AA206000091MA1999999098371MD1990001...,SYN08806700 06/// /0103 10021 20011 39837 4035...,,
2020-01-01,67150,99999,2020,1,1,0,0,4,46.183,7.033,...,-0.9,1,-1.0,1,1035.1,1,AA101000091AA206000091MA1999999097471MD1810021...,SYN09406715 06/// /3402 11009 21010 39747 4035...,,
2020-01-01,606720,99999,2020,1,1,0,0,4,24.217,5.533,...,2.4,1,-10.4,1,,9,,SYN03060672 36/// /0810 10024 21104=,,
2020-01-01,478540,99999,2020,1,1,0,0,4,32.084,131.451,...,3.0,1,3.0,1,,9,GA1021+009141999GE19MSL +99999+99999GF199999...,MET084METAR RJFN 010000Z 27007KT 9999 FEW030 0...,,


In [57]:
geo_data_frame = dask_geopandas.from_dask_dataframe(data_frame)
geo_data_frame = geo_data_frame.set_geometry(
    dask_geopandas.points_from_xy(data_frame, "latitude", "longitude")
)

In [58]:
geo_data_frame.to_parquet("isd/parquet", partition_on=["year", "month"])


This metadata specification does not yet make stability promises.  We do not yet recommend using this in a production setting unless you are able to rewrite your Parquet/Feather files.

  return func(*args, **kwargs)


In [61]:
data_frame = dask_geopandas.read_parquet("isd/parquet")
data_frame

<dask_geopandas.GeoSeries | 42 tasks | 21 npartitions>