In [2]:
# Notebook test: read partitioned parquet dataset and compute combined-key means with Polars
import polars as pl
from pathlib import Path

# Adjust this to your partitioned dataset directory
parquet_dir = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/modis.parquet/**/*.parquet"

# Variables and grouping columns to test
variables = ["median"]            # replace with your numeric columns
group_cols = ["year"] # group_cols = ["pixel_id", "tile_ix", "tile_iy"]        # replace with your partition / grouping cols

# Lazy read the parquet dataset (Polars can read partitioned parquet directories)
lf = pl.scan_parquet(str(parquet_dir))

# # Build a combined key (string) from multiple grouping columns
# key_expr = pl.concat_str([pl.col(c).cast(pl.Utf8) for c in group_cols], separator="_").alias("_group_key")

# Aggregation expressions: sum and count per variable (so we can compute means robustly)
agg_exprs = []
for v in variables:
    agg_exprs.append(pl.col(v).sum().alias(f"{v}_sum"))
    agg_exprs.append(pl.col(v).count().alias(f"{v}_count"))

# Group, aggregate and collect results
grp = (
    lf.group_by(group_cols)
      .agg(agg_exprs)
      .collect(engine="streaming")   # materialize into an eager DataFrame for inspection
)

# Compute means from sum/count and keep only mean columns
for v in variables:
    grp = grp.with_columns((pl.col(f"{v}_sum") / pl.col(f"{v}_count")).alias(f"{v}_mean"))

mean_cols = [f"{v}_mean" for v in variables]
means_df = grp.select(group_cols + mean_cols)

# Show top rows
print("Combined-key means (sample):")
print(means_df.head(20))

Combined-key means (sample):
shape: (20, 2)
┌──────┬─────────────┐
│ year ┆ median_mean │
│ ---  ┆ ---         │
│ i16  ┆ f64         │
╞══════╪═════════════╡
│ 2015 ┆ 284.264173  │
│ 2016 ┆ 284.637266  │
│ 2020 ┆ 284.577406  │
│ 2000 ┆ 285.50184   │
│ 2009 ┆ 284.064664  │
│ …    ┆ …           │
│ 2003 ┆ 284.018582  │
│ 2004 ┆ 283.678275  │
│ 2007 ┆ 284.247513  │
│ 2008 ┆ 284.068709  │
│ 2010 ┆ 284.532078  │
└──────┴─────────────┘


In [None]:
# Notebook test: read partitioned parquet dataset and compute combined-key means with DuckDB
import duckdb
import pandas as pd
from pathlib import Path

# Adjust this to your partitioned dataset directory
parquet_dir = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/modis.parquet/**/*.parquet"

# Variables and grouping columns to test
variables = ["median"]  # replace with your numeric columns
group_cols = ["pixel_id", "tile_ix", "tile_iy"]  # replace with your partition / grouping cols

# Create a DuckDB connection
con = duckdb.connect()

# Build the combined key expression for SQL
key_parts = " || '_' || ".join([f"CAST({col} AS VARCHAR)" for col in group_cols])
key_expr = f"({key_parts}) AS _group_key"

# Build aggregation expressions
agg_exprs = [f"AVG({v}) AS {v}_mean" for v in variables]

# Construct the SQL query
query = f"""
SELECT 
    {key_expr},
    {', '.join(agg_exprs)}
FROM read_parquet('{parquet_dir}')
GROUP BY {', '.join(group_cols)}
"""

print("Executing query...")
print(query)
print()

# Execute and get results as pandas DataFrame
con.execute(query)

# Close connection
con.close()

Executing query...

SELECT 
    (CAST(pixel_id AS VARCHAR) || '_' || CAST(tile_ix AS VARCHAR) || '_' || CAST(tile_iy AS VARCHAR)) AS _group_key,
    AVG(median) AS median_mean
FROM read_parquet('/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/modis.parquet/**/*.parquet')
GROUP BY pixel_id, tile_ix, tile_iy




FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
output_means = pd.read_parquet("")

In [5]:
import pyarrow.parquet as pq
import pyarrow as pa

# Path to your parquet file
means_file = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/output_means.parquet"

# Method 1: Read first N rows efficiently (RECOMMENDED)
# This reads only the needed row groups, very memory efficient
parquet_file_obj = pq.ParquetFile(means_file)
table_head = parquet_file_obj.read_row_group(6912).slice(0, 10)  # First 10 rows
df_head = table_head.to_pandas()
print("Method 1 - First 10 rows:")
print(df_head)
print()


Method 1 - First 10 rows:
     _group_key  median_mean
0  3526551_8_33   300.157143
1  3600279_8_33   297.202381
2  3655575_8_33   296.320476
3  3829655_8_33   300.792381
4  2607000_8_33   299.920000
5  3006360_8_33   298.316190
6  3211160_8_33   300.183810
7  3233688_8_33   299.119048
8  3360664_8_33   300.736190
9  2592665_8_33   299.787143



In [None]:
# Show top rows
print("Combined-key means (sample):")
print(means_df.head(20))
print(f"\nTotal groups: {len(means_df)}")

# Convert a specific variable means to pandas Series (index = combined key)
ps = means_df.set_index("_group_key")[f"{variables[0]}_mean"]
print(f"\nExample pandas Series for {variables[0]}:")
print(ps.head())


In [None]:
# Notebook test: read partitioned parquet dataset and compute combined-key means with Polars
import polars as pl
from pathlib import Path

# Adjust this to your partitioned dataset directory
parquet_dir = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/modis_subset.parquet/**/*.parquet"

# Variables and grouping columns to test
variables = ["median"]            # replace with your numeric columns
group_cols = ["year"] # group_cols = ["pixel_id", "tile_ix", "tile_iy"]        # replace with your partition / grouping cols

# Lazy read the parquet dataset (Polars can read partitioned parquet directories)
lf = pl.scan_parquet(str(parquet_dir))


In [None]:
result = df.select(
    pl.fold(
        acc=pl.lit(0),
        function=operator.add,
        exprs=pl.col("a", "b"),
    ).alias("sum_fold"),
    pl.sum_horizontal(pl.col("a", "b")).alias("sum_horz"),
)

In [None]:

# # Build a combined key (string) from multiple grouping columns
# key_expr = pl.concat_str([pl.col(c).cast(pl.Utf8) for c in group_cols], separator="_").alias("_group_key")

# Aggregation expressions: sum and count per variable (so we can compute means robustly)
agg_exprs = []
for v in variables:
    agg_exprs.append(pl.col(v).sum().alias(f"{v}_sum"))
    agg_exprs.append(pl.col(v).count().alias(f"{v}_count"))

# Group, aggregate and collect results
grp = (
    lf.group_by(group_cols)
      .agg(agg_exprs)
      .collect(engine="streaming")   # materialize into an eager DataFrame for inspection
)

# Compute means from sum/count and keep only mean columns
for v in variables:
    grp = grp.with_columns((pl.col(f"{v}_sum") / pl.col(f"{v}_count")).alias(f"{v}_mean"))

mean_cols = [f"{v}_mean" for v in variables]
means_df = grp.select(group_cols + mean_cols)

# Show top rows
print("Combined-key means (sample):")
print(means_df.head(20))

In [2]:
import dask.dataframe as dd

In [50]:
parquet_dir = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/_modis.parquet/ix=3/iy=23/data.parquet"
output_dir = "/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/.streamreg/test.parquet"
subset = dd.read_parquet(parquet_dir)
keys = ["pixel_id", "year"]
vars_to_demean = ["modis_median", "viirs_annual"]

In [27]:
target_partition_size = 100 * 1000 * 1000
subset.set_index("pixel_id", sorted=True).repartition(partition_size="100MB").to_parquet(output_dir)

In [99]:
subset = dd.read_parquet(output_dir)[vars_to_demean + ["year"]]

In [100]:
entity_means = subset.groupby(keys[0])[vars_to_demean].agg("mean")
means_merged = dd.merge(subset, entity_means, how="left", on=keys[0], suffixes=("", "_tmpmean"))
for var in vars_to_demean:
    means_merged[var] = means_merged[var] - means_merged[var + "_tmpmean"]
result = means_merged.drop(columns = [var + "_tmpmean" for var in vars_to_demean])

In [101]:
time_means = result.groupby(keys[1], sort=False)[vars_to_demean].agg("mean")
means_merged = dd.merge(subset, time_means, how="left", on=keys[1], suffixes=("", "_tmpmean"))
for var in vars_to_demean:
    means_merged[var] = means_merged[var] - means_merged[var + "_tmpmean"]
result = means_merged.drop(columns = [var + "_tmpmean" for var in vars_to_demean])

result = result.set_index(subset.index, sort=False)

In [90]:
result.shape[0].compute()

81217584

In [88]:
means_merged.shape[0].compute()

81217584

In [103]:
result.head(50)

Unnamed: 0_level_0,modis_median,viirs_annual,year
pixel_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
844523714379776,281.358486,,2000
844523714379776,281.364819,,2001
844523714379776,281.274131,,2002
844523714379776,281.861379,,2003
844523714379776,281.937368,,2004
844523714379776,282.136143,,2005
844523714379776,282.064746,,2006
844523714379776,282.01438,,2007
844523714379776,281.769123,,2008
844523714379776,281.863954,,2009


In [42]:
index_subset = subset.index.compute()

In [44]:
index_subset == test

array([False, False, False, ..., False, False, False], shape=(81217584,))

In [None]:
subset.to_parquet("/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/.streamreg/modis_subset.parquet", partition_on=entity_cols)

In [4]:
subset['tile_ix'].max().compute()

6

In [17]:
keys

[['tile_ix', 'tile_iy', 'pixel_id'], 'year']

In [30]:
test = subset.groupby(time_cols)[["median", "viirs_annual"]].transform("mean", meta=({'median': 'f8', 'viirs_annual': 'f8'})).compute()

In [31]:
test

Unnamed: 0,median,viirs_annual
66167067,297.726274,0.366847
66167070,297.534712,0.372514
66167073,297.552540,
66167074,297.036426,
66167076,297.722244,
...,...,...
43265317,297.319552,
43265319,297.258862,
43265323,296.987941,
43265324,297.193668,


In [1]:
import pandas as pd
test = pd.read_parquet("/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/__modis.parquet/ix=0/iy=0/data.parquet")
test.columns

Index(['year', 'median', 'mean', 'rollmax3', 'viirs_annual', 'ntl_harm',
       'reg_fav', 'subdivisions', 'countries', 'HDI_VH', 'HDI_HI', 'WB_LM',
       'WB_UM', 'HDI_ME', 'WB_HI', 'pixel_id', 'tile_ix', 'tile_iy',
       'median_unit_demeaned', 'mean_unit_demeaned', 'rollmax3_unit_demeaned',
       'viirs_annual_unit_demeaned', 'ntl_harm_unit_demeaned',
       'reg_fav_unit_demeaned', 'median_twoway_demeaned',
       'mean_twoway_demeaned', 'rollmax3_twoway_demeaned',
       'viirs_annual_twoway_demeaned', 'ntl_harm_twoway_demeaned',
       'reg_fav_twoway_demeaned'],
      dtype='object')

In [9]:
data

Unnamed: 0,Array,Chunk
Bytes,10.82 GiB,1.00 MiB
Shape,"(33601, 86401)","(512, 512)"
Dask graph,11154 chunks in 2 graph layers,11154 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.82 GiB 1.00 MiB Shape (33601, 86401) (512, 512) Dask graph 11154 chunks in 2 graph layers Data type float32 numpy.ndarray",86401  33601,

Unnamed: 0,Array,Chunk
Bytes,10.82 GiB,1.00 MiB
Shape,"(33601, 86401)","(512, 512)"
Dask graph,11154 chunks in 2 graph layers,11154 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.82 GiB,1.00 MiB
Shape,"(33601, 86401)","(512, 512)"
Dask graph,11154 chunks in 2 graph layers,11154 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 10.82 GiB 1.00 MiB Shape (33601, 86401) (512, 512) Dask graph 11154 chunks in 2 graph layers Data type float32 numpy.ndarray",86401  33601,

Unnamed: 0,Array,Chunk
Bytes,10.82 GiB,1.00 MiB
Shape,"(33601, 86401)","(512, 512)"
Dask graph,11154 chunks in 2 graph layers,11154 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [5]:
import polars as pl
data = pl.scan_parquet(str("/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/modis.parquet/**/*.parquet"))
data.head().collect()

year,modis_median,modis_mean,modis_rollmax3,viirs_annual,ntl_harm,reg_fav,HDI_ME,HDI_VH,WB_LM,HDI_HI,WB_HI,WB_UM,pixel_id,__index_level_0__
i16,f64,f64,f64,f64,f64,bool,bool,bool,bool,bool,bool,bool,u64,i64
2000,,,,,0.0,False,False,False,False,False,False,False,1705984,17493
2001,,,,,0.0,False,False,False,False,False,False,False,1705984,17494
2002,,,,,0.0,False,False,False,False,False,False,False,1705984,17495
2003,,,,,0.0,False,False,False,False,False,False,False,1705984,17496
2004,,,,,0.0,False,False,False,False,False,False,False,1705984,17497


---

In [2]:
import pandas as pd

In [7]:
pd.read_parquet("/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/scratch/optimized_parts/part-*.parquet")[["pixel_id", "year"]]

Unnamed: 0,pixel_id,year
0,1688892812505922,2007
1,1688892812505922,2008
2,1688892812505922,2009
3,1688892812505922,2010
4,1688892812505922,2011
...,...,...
45907360,1688897106188193,2010
45907361,1688897106188193,2011
45907362,1688897106188193,2012
45907363,1688897106188193,2013


In [6]:
pd.read_parquet("/scicore/home/meiera/schulz0022/projects/growth-and-temperature/data_nobackup/assembled/scratch/demeaned_parts/part-*.parquet")[["pixel_id", "year"]]

Unnamed: 0,pixel_id,year
0,1688892811822688,2002
1,1688892811822688,2003
2,1688892811822688,2004
3,1688892811822688,2005
4,1688892811822688,2006
...,...,...
45907360,1688892812861880,2013
45907361,1688892812861880,2014
45907362,1688892812861880,2015
45907363,1688892812861880,2016
