## Import Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import geopandas as gpd
import os
from glob import glob
import warnings
import argparse
from tqdm import tqdm
import json
from shapely.geometry import Point
from datetime import datetime, timedelta
from scipy.stats import weibull_min
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
from tqdm.contrib.concurrent import process_map

# Suppress warnings to keep output clean
warnings.filterwarnings('ignore')

## Define the Parameters

In [None]:
CROP = 'ww'
DISTANCE = 2.5
EPSG = 25832

# Path of the directories
WORK_DIR = '/beegfs/halder/GITHUB/RESEARCH/landscape-yield-analysis/'
os.chdir(WORK_DIR)
MAIN_DATA_DIR = '/beegfs/halder/DATA/'
WORK_DATA_DIR = os.path.join(WORK_DIR, 'data')
WORK_TEMP_DIR = os.path.join(WORK_DIR, 'temp')

# Define the DWD data directory
DWD_DATA_DIR = '/beegfs/common/data/climate/dwd/csvs/germany_ubn_1951-01-01_to_2024-08-30'

OUT_DIR = os.path.join(WORK_DIR, 'output', str(DISTANCE), CROP)
if os.path.exists(OUT_DIR) == False:
    os.makedirs(OUT_DIR, exist_ok=True)
    print('Output directory successfully created!')
else:
    print('Output directory already existed.')

## Load Hexagonal Grid for Germany

In [None]:
# Path to grid shapefile
GRID_PATH = os.path.join(WORK_DATA_DIR, 'VECTOR', f'DE_Hexbins_{DISTANCE}sqkm_EPSG_{EPSG}.shp')

# Load grid as a GeoDataFrame and retain relevant columns
grids_gdf = gpd.read_file(GRID_PATH)
grids_gdf = grids_gdf[['id', 'geometry']]
grids_gdf['id'] = grids_gdf['id'].astype(int)

print('Successfully read the grids!')
grids_gdf.plot();

In [None]:
# Read the Germany shapefile
DE_gdf = gpd.read_file(os.path.join(MAIN_DATA_DIR, 'DE_NUTS', 'DE_NUTS_3.shp'))
DE_gdf = DE_gdf[DE_gdf['LEVL_CODE']==1] 
DE_gdf = DE_gdf.to_crs(f'EPSG:{EPSG}')

grids_centroids = grids_gdf.copy()
grids_centroids['geometry'] = grids_gdf.centroid

grids_centroids = gpd.sjoin_nearest(left_df=grids_centroids, right_df=DE_gdf[['NUTS_NAME', 'geometry']],
                                    how='inner')

grids_gdf = pd.merge(left=grids_gdf, right=grids_centroids[['id', 'NUTS_NAME']], how='inner', on='id')

# Specify the DE DWD grip file path
DE_DWD_json_path = os.path.join(MAIN_DATA_DIR, 'DE_DWD_Lat_Lon', 'latlon_to_rowcol.json')

with open(DE_DWD_json_path) as f:
    data = json.load(f)

# Convert data to GeoDataFrame
records = []
for coord, index in data:
    lat, lon = coord
    row, col = index
    point = Point(lon, lat)
    records.append({'row': row, 'col': col, 'geometry': point})

latlon_gdf = gpd.GeoDataFrame(records, geometry='geometry', crs='EPSG:4326')
latlon_gdf = latlon_gdf.to_crs(f'EPSG:{EPSG}')

# Apply spatial join
grids_gdf_dwd = gpd.sjoin(left_df=grids_gdf, right_df=latlon_gdf, how='inner', predicate='intersects')
grids_gdf_dwd.drop(columns=['index_right', 'geometry'], inplace=True)

for col in ['id', 'row', 'col']:
    grids_gdf_dwd[col] = grids_gdf_dwd[col].astype(int)

grids_gdf_dwd['rowcol'] = list(zip(grids_gdf_dwd['row'], grids_gdf_dwd['col']))

print(grids_gdf_dwd.shape)
grids_gdf_dwd.head()

## Process the phenolgy data

In [None]:
# Read the phenology data
phenology_df = pd.read_csv(os.path.join(MAIN_DATA_DIR, 'DE_Crop_Phenology', f'{CROP}_phenology.csv'))
phenology_df.rename(columns={'Sowing_DOY': 'Sowing_DATE', 'Flowering_DOY': 'Flowering_DATE', 'Harvest_DOY': 'Harvest_DATE'}, inplace=True)
for date_col in ['Sowing_DATE', 'Flowering_DATE', 'Harvest_DATE']:
    event = date_col.split("_")[0]
    phenology_df[date_col] = pd.to_datetime(phenology_df[date_col], format='%Y-%m-%d')
    phenology_df[f'{event}_DOY'] = phenology_df[date_col].dt.dayofyear

# Group by STATE_ID and take the median of DOYs
median_doys = phenology_df.groupby('STATE_NAME')[['Sowing_DOY', 'Harvest_DOY']].median().round().astype(int).reset_index()
median_doys['Start_DOY'] = median_doys['Sowing_DOY'].apply(lambda v: v - 30 if v > 30 else 365 + (v - 30))
median_doys['End_DOY'] = median_doys['Harvest_DOY']

phenology_df = phenology_df.merge(median_doys, on='STATE_NAME', suffixes=('', '_MEDIAN'))

# Create full year range
years = list(range(1952, 2024))

# Create cartesian product of states and years
phenology_median = pd.MultiIndex.from_product([median_doys['STATE_NAME'], years], names=['STATE_NAME', 'Year']).to_frame(index=False)

# Merge back state names and median DOYs
phenology_median = phenology_median.merge(median_doys, on='STATE_NAME', how='left')

# Assuming your DataFrame is named df
def calculate_duration(row):
    start_doy = row['Start_DOY']
    end_doy = row['End_DOY']
    year = row['Year']
    
    if end_doy >= start_doy:
        return end_doy - start_doy
    else:
        # Check for leap year (i.e. next year)
        next_year = year + 1
        is_leap = (next_year % 4 == 0 and (next_year % 100 != 0 or next_year % 400 == 0))
        year_length = 366 if is_leap else 365
        return (year_length - start_doy) + end_doy

phenology_median['Duration'] = phenology_median.apply(calculate_duration, axis=1)

def doy_to_date(year, doy, event='sow'):
    if event == 'sow':
        return datetime(year-1, 1, 1) + timedelta(days=int(doy) - 1)
    else:
        return datetime(year, 1, 1) + timedelta(days=int(doy) - 1)

phenology_median['Sowing_DATE'] = phenology_median.apply(lambda row: doy_to_date(row['Year'], row['Sowing_DOY'], event='sow'), axis=1)
phenology_median['Harvest_DATE'] = phenology_median.apply(lambda row: doy_to_date(row['Year'], row['Harvest_DOY'], event='harvest'), axis=1)

phenology_median['Start_DATE'] = phenology_median.apply(lambda row: doy_to_date(row['Year'], row['Start_DOY'], event='sow'), axis=1)
phenology_median['End_DATE'] = phenology_median.apply(lambda row: doy_to_date(row['Year'], row['End_DOY'], event='harvest'), axis=1)

print(phenology_median.shape)
phenology_median.head()

## Process the climate data

In [None]:
def get_phenology(state_name, year):
    index = phenology_median[(phenology_median['STATE_NAME']==state_name) & (phenology_median['Year']==year)].index[0]
    data_dict = phenology_median.loc[index].to_dict()
    return data_dict

def extract_climate_data_by_grids(hex_id, out_dir):
    state_name = grids_gdf[grids_gdf['id'] == hex_id]['NUTS_NAME'].iloc[0]
    row_cols = np.array(grids_gdf_dwd[grids_gdf_dwd['id'] == hex_id]['rowcol'])

    # Prepare list of valid climate files
    file_paths = []
    for row, col in row_cols:
        fpath = os.path.join(DWD_DATA_DIR, str(row), f'daily_mean_RES1_C{col}R{row}.csv.gz')
        if os.path.exists(fpath):
            file_paths.append(fpath)

    # Read all valid climate files into a dict of DataFrames
    clim_dfs = []
    for f in file_paths:
        try:
            df = pd.read_csv(f, delimiter='\t', usecols=['Date', 'Precipitation', 'TempMin', 'TempMax', 'Radiation'])
            df['Date'] = pd.to_datetime(df['Date'])
            clim_dfs.append(df)
        except Exception as e:
            print(f"Error reading file {f}: {e}")
            continue

    if not clim_dfs:
        print(f"No climate files found for hex ID {hex_id}")
        return

    # Merge all DataFrames by taking mean per day
    clim_merged = pd.concat(clim_dfs).groupby('Date').mean().reset_index()

    # Loop through years
    for year in sorted(phenology_median['Year'].unique()):
        pheno = get_phenology(state_name, year)
        if pheno is None:
            continue

        start_date = pheno['Sowing_DATE']
        end_date = pheno['Harvest_DATE']

        # Extract climate data for the phenological window
        clim_year = clim_merged[
            (clim_merged['Date'] >= start_date) &
            (clim_merged['Date'] <= end_date)
        ].copy()

        if clim_year.empty:
            continue

        # Save to file
        save_path = os.path.join(out_dir, f'{hex_id}_{year}.csv')
        clim_year.to_csv(save_path, index=False)

In [None]:
OUT_DIR_climate = os.path.join(OUT_DIR, 'climate')

if os.path.exists(OUT_DIR_climate):
    print('Directory already exists!')
else:
    os.makedirs(OUT_DIR_climate, exist_ok=True)
    print('Directory successfully created!')

# Prepare all hex_ids to be processed
hex_ids = grids_gdf_dwd['id'].unique()

# Function to wrap
def wrapper(hex_id):
    extract_climate_data_by_grids(hex_id, OUT_DIR_climate)

# Use tqdm's process_map instead of executor.map
process_map(wrapper, hex_ids, max_workers=70, chunksize=1)
print('Climate data computation complete!')