### ERA5 Extraction

In [None]:
import ee 
import pandas as pd
import os
from datetime import datetime
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
%cd ..


In [11]:
ee.Authenticate()
ee.Initialize(project='my-project-410920') ### YOUR PROJECT HERE

In [12]:
variables_to_extract = ['temperature_2m', 'total_precipitation_sum', 'u_component_of_wind_10m',
						'v_component_of_wind_10m', 'surface_pressure', 'snowfall_sum',
						'snowmelt_sum', 'dewpoint_temperature_2m',
    					]

In [92]:
xy_df_sub1 = pd.read_csv('data/xy_df/xy_df_sub5.csv')
n_pred = 5
lookup = pd.read_csv('lookup.csv')

In [94]:
grid_to_coord = {}
# count = 0
for ind, row in lookup.iterrows():
	# if row['grid_id'] == '(89, 209)':
	# 	print(row['latitude'], row['longitude'])
	if (row['latitude']) != 0 or (row['longitude']) != 0:
		# count += 1
		grid_to_coord[row['grid_id']] = (row['latitude'], row['longitude']) 

# print(grid_to_coord.keys())
# print('(89, 209)' in grid_to_coord.keys())
# print(grid_to_coord['(45, 348)'])

In [95]:
count = 0
for ind, row in xy_df_sub1.iterrows():
    if (row['latitude'] == 0) and (row['longitude'] == 0):
        latitude, longitude = grid_to_coord.get(row['grid_id'], (None, None))
        if latitude is None or longitude is None:
            count += 1
            
        xy_df_sub1.at[ind, 'latitude'] = latitude
        xy_df_sub1.at[ind, 'longitude'] = longitude
        

Series([], Name: grid_id, dtype: object)

In [100]:
def create_directories(row, path):
    '''
    create directories that will be populated according to the dataset
    '''
    flood_target = row[f'target_flood_{n_pred}']

    # Sanitize the row name to create a valid folder name
    directory_name = f'extracted_{row.name}'
    directory_name = directory_name.replace('/', '_').replace(' ', '_')

    # Create the directory for this row, ensuring it's valid
    if flood_target == 1:
        output_dir = os.path.join(f'{path}/target_flood', directory_name)
    else:
        output_dir = os.path.join(f'{path}/no_flood_target', directory_name)

    os.makedirs(output_dir, exist_ok=True)

# Ensure directories are created before running the multithreaded execution
for _, row in tqdm(xy_df_sub1.iterrows(), total=len(xy_df_sub1)):
    create_directories(row, 'data/era5/target_5')

100%|██████████| 48262/48262 [00:03<00:00, 13234.86it/s]


In [101]:
# Load the Earth Engine ImageCollection
IC = ee.ImageCollection('ECMWF/ERA5_LAND/MONTHLY_AGGR').select(variables_to_extract)

def process_grid_point(args):
    '''
    process entries of xy_df_sub1 and populate appropriate directories
    '''
    index, row = args
    latitude_to_extract = row['latitude']
    longitude_to_extract = row['longitude']
    year_to_process = int(row['year'])
    grid_id = row['grid_id']
    flood_target = row[f'target_flood_{n_pred}']
    point = ee.Geometry.Point(longitude_to_extract, latitude_to_extract)

    # Create the output directory
    output_dir = f'data/era5/target_{n_pred}/target_flood/extracted_{index}' if flood_target == 1 else f'data/era5/target_{n_pred}/no_flood_target/extracted_{index}'
    os.makedirs(output_dir, exist_ok=True)

    # Define the date range for the entire period (5 years)
    start_date = datetime(year_to_process - 4, 1, 1)
    end_date = datetime(year_to_process, 12, 31)

    # Filter the ImageCollection for the entire date range
    era5_tp = IC.filterDate(start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))

    # Extract the time series for the entire period
    try:
        time_series = era5_tp.getRegion(point, scale=100000).getInfo()
        time_series_df = pd.DataFrame(time_series[1:], columns=time_series[0])

        # Convert the time column to a readable date format
        time_series_df['time'] = pd.to_datetime(time_series_df['time'], unit='ms')

        # Melt the DataFrame to the desired format
        melted_df = time_series_df.melt(
            id_vars=['time'],
            value_vars=variables_to_extract,
            var_name='id',
            value_name='value'
        )

        # Sort by variable name (id) and then by time
        melted_df = melted_df.sort_values(by=['id', 'time'])

        # Save the entire melted DataFrame to a single CSV file
        output_file = os.path.join(output_dir, f'era5_data.csv')
        melted_df.to_csv(output_file, index=False)

    except Exception as e:
        print(f"Failed to process grid point {grid_id, row.name, point, longitude_to_extract, latitude_to_extract}: {e}")


In [102]:
with ThreadPoolExecutor(max_workers=8) as executor:
	list(tqdm(executor.map(process_grid_point, [(index, row) for index, row in xy_df_sub1.iterrows()]), total=len(xy_df_sub1)))

100%|██████████| 48262/48262 [1:03:55<00:00, 12.58it/s] 


In [103]:
def populate_dir(xy_df_sub1, path):
	'''
	read directories and flatten weather vars
	'''
	directories = sorted([f for f in os.listdir(path) if not f.startswith('.')])
	num_features = 480

	# Create column names for the new features
	column_names = [f'feature_{i}' for i in range(num_features)]

	# Create an empty list to store the rows of data
	data = []

	# Loop through directories and process each one
	for directory in tqdm(directories):
		# Extract index from directory name
		idx = int(directory.split('_')[1])

		# Read and flatten features
		features_df = pd.read_csv(f'{path}/{directory}/era5_data.csv')
		values = list(features_df['value'])

		# Gather the entire row's data (keep all existing columns)
		row_data = xy_df_sub1.loc[idx].values  # All columns in that row

		# Append the original row data + the new flattened feature values
		data.append(list(row_data) + list(values))

	# Create a new DataFrame with all original columns and the new features
	return pd.DataFrame(data, columns=xy_df_sub1.columns.tolist() + column_names)

In [104]:
xy_df_sub_new_no_flood = populate_dir(xy_df_sub1, 'data/era5/target_5/no_flood_target')
xy_df_sub_new_flood = populate_dir(xy_df_sub1, 'data/era5/target_5/target_flood')

100%|██████████| 47668/47668 [00:34<00:00, 1377.69it/s]
100%|██████████| 594/594 [00:00<00:00, 1315.59it/s]


In [119]:
xy_df_sub1_new_combined = pd.concat([xy_df_sub_new_no_flood, xy_df_sub_new_flood], axis=0)
xy_df_sub1_new_combined.to_csv('xy_df_sub_5_combined.csv', index=True)

In [120]:
xy_df_sub1_new_combined

Unnamed: 0,Unnamed: 0.1,grid_id,year,flood_amt,storm_amt,earthquake_amt,extreme temperature _amt,landslide_amt,volcanic activity_amt,drought_amt,...,feature_470,feature_471,feature_472,feature_473,feature_474,feature_475,feature_476,feature_477,feature_478,feature_479
0,0,"(45, 348)",1960,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,-0.065926,-1.259714,-1.017257,-0.316835,-1.078647,-0.714023,-0.494092,-1.079635,-0.884315,-0.278880
1,1,"(45, 348)",1961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,-0.827393,-1.128178,-0.851815,-0.948912,-0.720752,-0.325468,-0.686827,-1.966628,-1.372923,-1.526947
2,10,"(45, 348)",1970,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,-0.286424,-1.477790,-0.369442,-0.694219,-1.491534,-1.156600,-1.244941,-1.335228,-0.803331,-1.001159
3,100,"(45, 350)",2001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,-0.645275,-0.927491,-0.782172,-1.245552,-0.693470,-0.964625,-1.113105,-0.475281,-0.160082,-0.303555
4,1000,"(56, 123)",2016,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.001294,1.630767,0.898941,-0.054849,0.669643,-0.322016,0.956401,0.335617,-0.508404,-0.685915
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
589,9594,"(88, 210)",1996,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.035985,0.319355,0.427845,0.654719,0.660096,0.395050,0.083282,0.132421,0.016070,-0.137990
590,9599,"(88, 210)",2001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.140352,0.295562,0.506684,0.652696,0.633152,0.438817,0.152811,0.094741,0.102379,-0.057797
591,9604,"(88, 210)",2006,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.203236,0.382482,0.411757,0.803954,0.847347,0.678435,0.700937,0.474789,0.122861,-0.096496
592,9608,"(88, 210)",2010,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.079394,0.352450,0.477821,0.789055,0.960232,1.067485,0.606735,0.336443,0.207730,0.157218
