# Data processing

This notebook is designed to process the files shared by algebra.

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import glob
from scipy.spatial import cKDTree
import numpy as np

## Parsing and combining all data files

In [2]:
# Get all CSV file paths
consumption_csv_files = glob.glob('./consumption_data/*.csv')
weather_csv_files = glob.glob('./weather_data/*.csv')

# Read and combine all files into one DataFrame
consumption_df = [pd.read_csv(file, parse_dates=['date_time']) for file in consumption_csv_files]
weather_df = [pd.read_csv(file, parse_dates=['date_time']) for file in weather_csv_files]

# Merge dataframes of all processed files
consumption_df = pd.concat(consumption_df, ignore_index=True)
weather_df = pd.concat(weather_df, ignore_index=True)

In [3]:
# Filling in empty coordinates for Aqaba

# consumption_df[consumption_df['x'].isna()]
# 29.37757259358926, 34.978111588533196 AQABA
# 31.788204771724143, 36.232508452369466 MWAGGER
# Y, X
consumption_df['y'] = consumption_df['y'].fillna(29.37757259358926)
consumption_df['x'] = consumption_df['x'].fillna(34.978111588533196)
consumption_df = consumption_df.drop(columns=['weather_source_id'])

In [4]:
consumption_df.head()

Unnamed: 0,id,name,x,y,id.1,date_time,power,location_id
0,3,Mwaqar (MWQAR),36.127889,31.796571,171905,2022-01-16 16:15:00,0.002792,3
1,3,Mwaqar (MWQAR),36.127889,31.796571,171906,2022-01-16 16:30:00,0.002792,3
2,3,Mwaqar (MWQAR),36.127889,31.796571,171907,2022-01-16 16:45:00,0.002792,3
3,3,Mwaqar (MWQAR),36.127889,31.796571,171908,2022-01-16 17:00:00,0.002792,3
4,3,Mwaqar (MWQAR),36.127889,31.796571,171909,2022-01-16 17:15:00,0.002792,3


In [5]:
weather_df.head()

Unnamed: 0,id,name,source,lon,lat,alt,id.1,date_time,horizontal_visibility_km,cloud_cover_oktas,wind_speed_knot,wind_direction_degree,irradiance,relative_humidity_percent,air_temperature_c,rainfall_mm,weather_source_id,pressure_mbar,air_density_c,module_temp_c
0,24,Queen Alia International Airport,JMD,36.019444,31.727222,722.0,4381376,2021-05-25 00:00:00,6.0,0.0,,,,,,,24,,,
1,24,Queen Alia International Airport,JMD,36.019444,31.727222,722.0,4381377,2021-05-25 03:00:00,6.0,,,,,,,,24,,,
2,24,Queen Alia International Airport,JMD,36.019444,31.727222,722.0,4381378,2021-05-25 06:00:00,6.0,0.0,,,,,,,24,,,
3,24,Queen Alia International Airport,JMD,36.019444,31.727222,722.0,4385595,2021-06-11 09:00:00,6.0,,,,,,,,24,,,
4,24,Queen Alia International Airport,JMD,36.019444,31.727222,722.0,4381382,2021-05-25 18:00:00,6.0,0.0,,,,,,,24,,,


## Verifying data types of all columns

In [6]:
print(consumption_df.dtypes)
print(weather_df.dtypes)

id                      int64
name                   object
x                     float64
y                     float64
id.1                    int64
date_time      datetime64[ns]
power                 float64
location_id             int64
dtype: object
id                                    int64
name                                 object
source                               object
lon                                 float64
lat                                 float64
alt                                 float64
id.1                                  int64
date_time                    datetime64[ns]
horizontal_visibility_km            float64
cloud_cover_oktas                   float64
wind_speed_knot                     float64
wind_direction_degree               float64
irradiance                          float64
relative_humidity_percent           float64
air_temperature_c                   float64
rainfall_mm                         float64
weather_source_id                     int6

## Validating weather data completeness per column/station

In [7]:
# Check data quality in % terms
nan_per_station = weather_df.groupby('id').apply(lambda group: group.isna().sum() / len(group))
nan_per_station

Unnamed: 0_level_0,id,name,source,lon,lat,alt,id.1,date_time,horizontal_visibility_km,cloud_cover_oktas,wind_speed_knot,wind_direction_degree,irradiance,relative_humidity_percent,air_temperature_c,rainfall_mm,weather_source_id,pressure_mbar,air_density_c,module_temp_c
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.594841,0.74537,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.918429,0.958836,0.056672,0.056672,0.056672,0.056672,0.056672,0.056672,0.0,1.0,1.0,1.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0
5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0
6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.984803,0.992522,0.005157,0.005157,0.005157,0.005157,0.005157,0.005157,0.0,1.0,1.0,1.0
7,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0
8,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0
9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.971305,0.984991,0.002268,0.002268,0.002268,0.002268,0.002268,0.002268,0.0,1.0,1.0,1.0
11,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0


## Selecting weather rich columns as features

In [8]:
# Filter out unnecessary weather features while keeping identifiers
weather_features = [
    # identifiers
    'id',
    'name',
    # features
    'lon',
    'lat',
    'alt',
    'date_time',
    'wind_speed_knot',
    'wind_direction_degree',
    'irradiance',
    'relative_humidity_percent',
    'air_temperature_c',
    'rainfall_mm',
]

weather_df = weather_df[weather_features]

In [9]:
# Always timing out, reviewing the data by each index manually below.
#
# weather_grouped = weather_df.groupby('id')
# weather_grouped.head()
# for id, group in weather_grouped:
#     plt.figure(figsize=(10, 5))
#     plt.plot(group['date_time'], group['air_temperature_c'], marker='o', label=f'Location {id}')
#     plt.title(f'Temperature Trend for Location {id}')
#     plt.xlabel('Date')
#     plt.ylabel('Temperature')
#     plt.grid(True)
#     plt.legend()
#     plt.show()

# location_id = 3
# loc_df = weather_df[weather_df['id'] == location_id]
# plt.figure(figsize=(10, 5))
# plt.plot(loc_df['date_time'], loc_df['air_temperature_c'], label=f'Location {location_id}')
# plt.title(f'Temperature Trend for Location {location_id}')
# plt.xlabel('Date')
# plt.ylabel('Temperature')
# plt.grid(True)
# plt.xticks(rotation=45)
# plt.legend()
# plt.show()

## Finding nearest weather station to power station by coords

In [10]:
def find_nearest_stations(consumption_stations, weather_stations):
    # Build a KDTree for the weather station coordinates
    weather_coords = weather_stations[['lat', 'lon']].to_numpy()
    tree = cKDTree(weather_coords)

    # Query the tree for each unique consumption station coordinate
    consumption_coords = consumption_stations[['y', 'x']].to_numpy()

    # Query the KDTree (always find the closest match)
    distances, indices = tree.query(consumption_coords)

    # Map the nearest weather station IDs to the consumption stations
    nearest_station_ids = weather_stations.iloc[indices].reset_index(drop=True)['id']

    # Add results back to the consumption_stations DataFrame
    consumption_stations = consumption_stations.copy().reset_index(drop=True)
    consumption_stations['weather_station_id'] = nearest_station_ids

    return consumption_stations

In [11]:
# Find the nearest weather station for each consumption station
weather_stations = weather_df[['id', 'lon', 'lat']].drop_duplicates()
consumption_stations = consumption_df[['id', 'x', 'y']].drop_duplicates()

consumption_stations = find_nearest_stations(
    consumption_stations, weather_stations,
)

consumption_df = consumption_df.merge(consumption_stations[['id', 'weather_station_id']], on='id', how='left')
consumption_df

Unnamed: 0,id,name,x,y,id.1,date_time,power,location_id,weather_station_id
0,3,Mwaqar (MWQAR),36.127889,31.796571,171905,2022-01-16 16:15:00,0.002792,3,24
1,3,Mwaqar (MWQAR),36.127889,31.796571,171906,2022-01-16 16:30:00,0.002792,3,24
2,3,Mwaqar (MWQAR),36.127889,31.796571,171907,2022-01-16 16:45:00,0.002792,3,24
3,3,Mwaqar (MWQAR),36.127889,31.796571,171908,2022-01-16 17:00:00,0.002792,3,24
4,3,Mwaqar (MWQAR),36.127889,31.796571,171909,2022-01-16 17:15:00,0.002792,3,24
...,...,...,...,...,...,...,...,...,...
3810235,44,Tafila City (TAFILA_CITY),35.640048,30.823988,3640892,2024-06-03 23:00:00,16.526047,44,15
3810236,44,Tafila City (TAFILA_CITY),35.640048,30.823988,3640893,2024-06-03 23:15:00,16.526047,44,15
3810237,44,Tafila City (TAFILA_CITY),35.640048,30.823988,3640894,2024-06-03 23:30:00,15.557803,44,15
3810238,44,Tafila City (TAFILA_CITY),35.640048,30.823988,3640895,2024-06-03 23:45:00,15.508034,44,15


In [12]:
# Check if any power station got associated with low quality weather data, e.g. weather ID 1
consumption_df.groupby('id')['weather_station_id'].unique()

id
1     [23]
2     [28]
3     [24]
4     [11]
5      [8]
6      [5]
7     [22]
8     [24]
9      [7]
10    [19]
11    [20]
12     [8]
13    [24]
14    [25]
15    [15]
16    [15]
17    [28]
18    [18]
19    [18]
20     [4]
21    [29]
22    [17]
23     [5]
24    [12]
25     [2]
26    [27]
27     [3]
28     [8]
29    [21]
30    [11]
31    [21]
32    [26]
33    [11]
34     [9]
35     [8]
36    [12]
37    [20]
38    [26]
39    [21]
40    [12]
41    [27]
42     [9]
44    [15]
45    [26]
68    [18]
Name: weather_station_id, dtype: object

In [13]:
# Save combined processed df files to CSV.
consumption_df.to_csv('./processed/00-combined-consumption.csv')
weather_df.to_csv('./processed/00-combined-weather.csv')

## Map weather data to Power data by time + nearest station ID

In [23]:
def find_closest_time(row, weather_data):
    # Filter the weather data for the relevant weather station
    relevant_weather_data = weather_data[weather_data['id'] == row['weather_station_id']]
    # Calculate the time difference and find the closest match
    time_diff = (relevant_weather_data['date_time'] - row['date_time']).abs()
    closest_idx = time_diff.idxmin()
    
    return relevant_weather_data.loc[closest_idx]

In [24]:
def merge_power_weather(power_data, weather_data):
    """
    Merge power station data with weather station data based on station ID and nearest timestamp.
    """
    merged_data = []

    # Iterate through power station rows
    for _, power_row in power_data.iterrows():
        closest_weather_row = find_closest_time(power_row, weather_data)
        
        # Combine the power station row with the matched weather station row
        combined_row = {**power_row.to_dict(), **closest_weather_row.to_dict()}
        merged_data.append(combined_row)

    # Convert to DataFrame
    merged_df = pd.DataFrame(merged_data)
    
    return merged_df

In [None]:
# Perform the merge
merged_df = merge_power_weather(consumption_df, weather_df)

## Map weather data to Power data by time + nearest station ID (Optimized)

In [21]:
!pip install tqdm



In [22]:
from multiprocessing import Pool
from functools import partial
from tqdm import tqdm

In [23]:
# Optimize data types for reduced memory usage
def optimize_data_types(df):
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = df[col].astype('float32')
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = df[col].astype('int32')
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() < 0.5 * len(df):  # Convert to category if unique values < 50% of rows
            df[col] = df[col].astype('category')
    return df

In [24]:
# Create a KDTree for efficient nearest neighbor search
# def build_time_kdtree(weather_data):
#     weather_timestamps = weather_data['date_time'].astype('int64') // 10**9  # Convert to seconds
#     return cKDTree(weather_timestamps.values.reshape(-1, 1))

In [25]:
# Find the closest weather data row for a given power station row
# def find_closest_time_kdtree(row, weather_tree, weather_data):
#     consumption_time = row['date_time'].timestamp()
#     dist, idx = weather_tree.query([[consumption_time]])
#     return weather_data.iloc[idx[0]]

In [26]:
# Parallel merge function for processing batches
# def parallel_merge(batch, weather_data_by_station):
#     merged_data = []

#     for _, row in batch.iterrows():
#         station_id = row['weather_station_id']
#         if station_id not in weather_data_by_station:
#             continue

#         station_weather_data = weather_data_by_station[station_id]
#         weather_tree = build_time_kdtree(station_weather_data)
#         closest_weather_row = find_closest_time_kdtree(row, weather_tree, station_weather_data)

#         merged_row = {**row.to_dict(), **closest_weather_row.to_dict()}
#         merged_data.append(merged_row)

#     return merged_data

# Usage of asof method for vectorized improvements
def parallel_merge(batch, weather_data_by_station):
    # Create an empty list to store results for all rows in the batch
    all_merged_data = []

    for station_id, station_batch in batch.groupby('weather_station_id'):
        # Get weather data for the specific station
        station_weather_data = weather_data_by_station[station_id]
        # Use pandas merge_asof to find the closest timestamps
        merged = pd.merge_asof(
            station_batch,
            station_weather_data,
            left_on='date_time',
            right_on='date_time',
            direction='nearest',
            suffixes=('_power', '_weather')
        )
        # Append the merged result for the current station to the final list
        all_merged_data.append(merged)

    return pd.concat(all_merged_data, ignore_index=True)

In [28]:
# Main merge function to process in parallel
def merge_power_weather_parallel(power_data, weather_data, batch_size=10000, n_jobs=4):
    # Reduce data size
    power_data = optimize_data_types(power_data)
    weather_data = optimize_data_types(weather_data)

    # Index weather data by station ID
    weather_data_by_station = {
        station_id: group for station_id, group in weather_data.groupby('id')
    }

    # Split power data into batches
    batches = [power_data.iloc[i:i + batch_size] for i in range(0, len(power_data), batch_size)]

    # Partial function to pass additional arguments to parallel function
    process_batch = partial(parallel_merge, weather_data_by_station=weather_data_by_station)

    # Use multiprocessing pool for parallel processing with progress bar
    with Pool(n_jobs) as pool:
        results = list(
            tqdm(
                pool.imap(process_batch, batches),
                total=len(batches),
                desc="Processing Batches",
                unit="batch"
            )
        )

    # Combine all results
    return pd.concat(results, ignore_index=True)

In [29]:
# Sort data by time for better locality during processing
consumption_df = consumption_df.sort_values(by='date_time').reset_index(drop=True)
weather_df = weather_df.sort_values(by='date_time').reset_index(drop=True)

# Merge data using parallel processing
merged_df = merge_power_weather_parallel(consumption_df, weather_df, batch_size=50000, n_jobs=6)

Processing Batches: 100%|██████████| 77/77 [02:52<00:00,  2.24s/batch]


In [35]:
merged_df.head()

Unnamed: 0,id_power,name_power,x,y,id.1,date_time,power,location_id,weather_station_id,id_weather,name_weather,lon,lat,alt,wind_speed_knot,wind_direction_degree,irradiance,relative_humidity_percent,air_temperature_c,rainfall_mm
0,25,Shedia (SHEDIA),36.134983,29.930689,2032129,2021-12-21 00:15:00,5.396791,25,2,2,Al Jafer,36.135555,30.342222,865.0,,,,,,
1,25,Shedia (SHEDIA),36.134983,29.930689,2032130,2021-12-21 00:30:00,5.218123,25,2,2,Al Jafer,36.135555,30.342222,865.0,,,,,,
2,25,Shedia (SHEDIA),36.134983,29.930689,2032131,2021-12-21 00:45:00,5.530919,25,2,2,Al Jafer,36.135555,30.342222,865.0,,,,,,
3,25,Shedia (SHEDIA),36.134983,29.930689,2032132,2021-12-21 01:00:00,6.579748,25,2,2,Al Jafer,36.135555,30.342222,865.0,,,,,,
4,25,Shedia (SHEDIA),36.134983,29.930689,2032133,2021-12-21 01:15:00,6.92664,25,2,2,Al Jafer,36.135555,30.342222,865.0,,,,,,


In [32]:
merged_df.to_csv('./processed/00-combined-merged.csv')