In [2]:
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pandas as pd


In [54]:
input_file = 'data_parquet/dynamic_historical_meteorology.parquet'
output_dir = 'data_parquet/dynamic_historical_meteorology'

In [3]:
table = pq.read_table(input_file)

Make sure that 'time' is a timestamp object

In [4]:

if table.schema.field("time").type != pa.timestamp("ns"):
    table = table.set_column(
        table.schema.get_field_index("time"),
        "time",
        pc.cast(table["time"], pa.timestamp("ns"))
    )

Add year column for partitioning

In [5]:
year_array = pc.year(table["time"])
table = table.append_column("year", year_array)

Cast the gauge field to `large_string` to overcome sorting issues resulting from 32-bit offsets.

In [6]:
table = table.set_column(
    table.schema.get_field_index("gauge"),
    "gauge",
    pc.cast(table["gauge"], pa.large_string())
)

In [19]:
print(table.schema)

time: timestamp[ns]
Tair: double
Qair: double
PSurf: double
Wind_E: double
Wind_N: double
SWdown: double
LWdown: double
CAPE: double
CRainf_frac: double
PotEvap: double
Rainf: double
gauge: large_string
year: int64
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 1559


Sort by stream_gage, time (critical for pruning performance)

In [8]:
%%time
table = table.sort_by([
    ("gauge", "ascending"),
    ("time", "ascending")
])

CPU times: user 5min 4s, sys: 1min 15s, total: 6min 19s
Wall time: 9min 8s


Use dictionary encoding for the `gauge` column, since it repeats many times. This is to improve performance.

In [None]:
# table = table.set_column(
#     table.schema.get_field_index("gauge"),
#     "gauge",
#     pc.dictionary_encode(table["gauge"])
# )

Write the data using hive partitioning

In [45]:
%%time

parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(
    compression="zstd",
)

partitioning = ds.partitioning(
    pa.schema([
        ("gauge", pa.large_string()),
        #("year", pa.int64())
    ]),
    flavor="hive"  # This creates gauge=X/year=Y/ paths
)

ds.write_dataset(
    table,
    base_dir=output_dir,
    format=parquet_format,
    partitioning=partitioning,  
    existing_data_behavior="overwrite_or_ignore",
    max_rows_per_file=5_000_000,
    max_rows_per_group=250_000,
)

CPU times: user 1min 43s, sys: 35.9 s, total: 2min 18s
Wall time: 59.4 s


Test accessing the hive partitioned data

In [30]:
from datetime import datetime

In [55]:
%%time
dataset = ds.dataset(output_dir,
                     format="parquet",
                     partitioning='hive')

CPU times: user 19.3 ms, sys: 55.9 ms, total: 75.1 ms
Wall time: 256 ms


In [56]:
gauges = ["STREAM-gauge-1618",
          "STREAM-gauge-1619",
          "STREAM-gauge-1620"]

In [57]:
%%time

result = dataset.to_table(
    filter=(
        (ds.field("gauge").isin(gauges)) &
        (ds.field("time") >= datetime(2001, 1, 1)) &
        (ds.field("time") < datetime(2010, 1, 1))
    )
)

CPU times: user 62.9 ms, sys: 161 ms, total: 224 ms
Wall time: 147 ms


In [58]:
%%time
df = result.to_pandas()

CPU times: user 16.1 ms, sys: 52 ms, total: 68.1 ms
Wall time: 106 ms


In [59]:
df.gauge.unique()

array(['STREAM-gauge-1618', 'STREAM-gauge-1619', 'STREAM-gauge-1620'],
      dtype=object)

Test Access from HydroShare

In [6]:
from utils import S3hsclient as hsclient

In [7]:
hs = hsclient.S3HydroShare()

Username:  TonyCastronova
Password for TonyCastronova:  ········


In [8]:
fpath = 'tonycastronova/248ec0f13d6c4580b2faa66425cb58c3/data/contents/dynamic_historical_meteorology'

In [9]:
from datetime import datetime

In [10]:
%%time
dataset = ds.dataset(
    fpath,
    format="parquet",
    partitioning='hive',
    filesystem=hs.get_s3_filesystem() 
)

CPU times: user 132 ms, sys: 66.6 ms, total: 199 ms
Wall time: 959 ms


In [13]:
%%time

st = datetime(2000,1,1)
et = datetime(2010,1,1)
gauges = ["STREAM-gauge-1702",
          "STREAM-gauge-1619",
          "STREAM-gauge-1620"]

result = dataset.to_table(
    filter=(
        (ds.field("gauge").isin(gauges)) &
        (ds.field("time") >= st) &
        (ds.field("time") < et)
    )
)

CPU times: user 404 ms, sys: 207 ms, total: 611 ms
Wall time: 3.05 s


In [14]:
result.to_pandas()

Unnamed: 0,time,Tair,Qair,PSurf,Wind_E,Wind_N,SWdown,LWdown,CAPE,CRainf_frac,PotEvap,Rainf,year,gauge
0,2006-04-01 08:00:00,281.611412,0.005264,96063.631146,1.221399,5.110041,0.000000,314.248670,102.041793,0.246646,-0.084661,0.018745,2006,STREAM-gauge-1702
1,2006-04-01 09:00:00,281.143228,0.005236,95995.751389,1.295613,4.950364,0.000000,303.622641,114.155856,0.292564,-0.084661,0.010839,2006,STREAM-gauge-1702
2,2006-04-01 10:00:00,281.259167,0.005262,95990.242188,1.718918,4.702481,0.000000,303.622173,78.082368,0.057125,-0.085430,0.009109,2006,STREAM-gauge-1702
3,2006-04-01 11:00:00,281.375162,0.005287,95984.751352,2.141978,4.454532,7.411759,303.621838,42.009565,0.089939,-0.085430,0.058204,2006,STREAM-gauge-1702
4,2006-04-01 12:00:00,281.491163,0.005313,95979.265964,2.564905,4.206528,79.393703,326.063951,5.936678,0.060418,-0.085430,0.105755,2006,STREAM-gauge-1702
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
87666,2006-04-01 03:00:00,284.206940,0.005448,96418.440378,1.385154,5.884345,0.000000,340.547583,6.698358,0.172468,-0.132741,0.223010,2006,STREAM-gauge-1702
87667,2006-04-01 04:00:00,283.653784,0.005405,96345.421824,1.280783,5.732732,0.000000,340.548825,30.402976,0.368417,-0.100952,0.038814,2006,STREAM-gauge-1702
87668,2006-04-01 05:00:00,283.100599,0.005363,96272.429098,1.176606,5.581403,0.000000,340.550019,54.106333,0.273921,-0.100952,0.032555,2006,STREAM-gauge-1702
87669,2006-04-01 06:00:00,282.547588,0.005321,96199.449218,1.072630,5.430145,0.000000,314.246562,77.812451,0.283168,-0.100952,0.054521,2006,STREAM-gauge-1702


In [63]:
st = datetime(2000,1,1)
et = datetime(2000,2,2)
gauges = ["STREAM-gauge-1618",
          "STREAM-gauge-1619",
          "STREAM-gauge-1620"]
table = pq.read_table(
    fpath,
    filesystem = hs.get_s3_filesystem(), 
    filters = [
        (ds.field("gauge").isin(gauges)),
        ('time', '>=', st),
        ('time', '<=', et),
    ],
    flavor='hive',
)
#subset = table.to_pandas()

TypeError: read_table() got an unexpected keyword argument 'flavor'

In [14]:
table.to_pandas()

Unnamed: 0,time,Tair,Qair,PSurf,Wind_E,Wind_N,SWdown,LWdown,CAPE,CRainf_frac,PotEvap,Rainf,gauge
0,2000-01-01 00:00:00,272.652169,0.003005,99197.498305,2.912050,-0.408867,0.000000,221.407839,0.000000,0.0,-0.008211,0.0,STREAM-gauge-2000
1,2000-01-01 01:00:00,272.786622,0.002990,99260.702296,2.547032,0.081325,0.000000,221.407839,0.000000,0.0,-0.005101,0.0,STREAM-gauge-2000
2,2000-01-01 02:00:00,272.927149,0.002974,99324.049951,2.188461,0.580743,0.000000,221.401768,0.000000,0.0,-0.005101,0.0,STREAM-gauge-2000
3,2000-01-01 03:00:00,273.067694,0.002959,99387.233278,1.823822,1.080548,0.000000,216.664975,0.000000,0.0,-0.005101,0.0,STREAM-gauge-2000
4,2000-01-01 04:00:00,272.919989,0.002941,99320.889603,1.454362,1.439106,0.000000,216.665362,0.000000,0.0,-0.003905,0.0,STREAM-gauge-2000
...,...,...,...,...,...,...,...,...,...,...,...,...,...
764,2000-02-01 20:00:00,270.954655,0.002604,98280.432322,7.051822,-1.477537,191.788314,259.732809,0.002477,0.0,0.103686,0.0,STREAM-gauge-2000
765,2000-02-01 21:00:00,271.019621,0.002618,98318.881673,6.917108,-1.628350,124.375420,243.931457,0.003716,0.0,0.103686,0.0,STREAM-gauge-2000
766,2000-02-01 22:00:00,270.243670,0.002545,98352.216701,6.953515,-1.999457,31.627926,243.935388,10.018799,0.0,0.021812,0.0,STREAM-gauge-2000
767,2000-02-01 23:00:00,269.461637,0.002471,98385.684617,6.984235,-2.379774,0.000000,243.938932,20.004383,0.0,0.021812,0.0,STREAM-gauge-2000


In [25]:
import pyarrow as pa
import pyarrow.dataset as ds

test = pa.table({
    "gauge": ["A", "B"],
    "year": [2020, 2021],
    "value": [1, 2]
})

partitioning = ds.partitioning(
    pa.schema([
        ("gauge", pa.large_string()),
        ("year", pa.int64())
    ]),
    flavor="hive"  # This creates gauge=X/year=Y/ paths
)

parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(compression="zstd")

ds.write_dataset(
    test,
    base_dir="/tmp/test_partitioning",
    format=parquet_format,
    partitioning=partitioning,  # Use the explicit Hive partitioning
    existing_data_behavior="delete_matching",  # Clean slate
    max_rows_per_group=250_000,
)

In [26]:
for root, dirs, files in os.walk("/tmp/test_partitioning"):
    print(root)

/tmp/test_partitioning
/tmp/test_partitioning/gauge=B
/tmp/test_partitioning/gauge=B/year=2021
/tmp/test_partitioning/A
/tmp/test_partitioning/A/2020
/tmp/test_partitioning/gauge=A
/tmp/test_partitioning/gauge=A/year=2020
/tmp/test_partitioning/B
/tmp/test_partitioning/B/2021
