# Processing Data - Creating and Cleaning Consolidated Data Files

This notebook includes the following sections:
- Combining Files
- Cleaning and Filtering
- Binning the Lightning Data
- Joining the Datasets
- Calculating Current Category and Intensification Change

This notebook should be executed after the `data_file_cleaning.ipynb` notebook. 

In the Combining Files section, we use the Google Drive API to grab and combine all the separate files in the folder where our data lives. We also parse the tropical cyclone (TC) ID and name from each of the file names to add as columns. This section will create intermediate files in a directory called `processed_files`, and `Combined_Reduced_Trackfile.txt` and `Combined_WWLLN_Locations.txt` in a directory called `combined_files` for use in the next section. The `processed_files` directory and its contents can be deleted after completion of this section.

In the Cleaning and Filtering section, we perform some post-processing on the data by adding column headers, filtering to tropical cyclones that are category 1 or higher, and calculating the direct distance of each lightning strike to the TC storm center. This will create additional `Filtered_Reduced_Trackfile.csv` and `Filtered_WWLLN_Locations.txt` files. 

The Binning the Lightning Data section creates 30 minute bins for use in evaluating lightning burst thresholds and comparison with TC wind and pressure data. This section yields `WWLLN_innercore.csv`, `WWLLN_rainband.csv`, `WWLLN_innercore_w_time.csv`, `WWLLN_innercore_timebin_count.csv`,  `WWLLN_rainband_timebin_count.csv`, `WWLLN_rainband_w_time.csv`.

In Joining the Datasets, we join the WWLLN datasets with the trackfile dataset. We join each 30-minute bin with the nearest wind and pressure data. This section does not create any files.

The last part of this notebook, Calculating Current Category and Intensification Change, calculates category and intensification change bins for use in the burst threshold analysis. This portion will create the `innercore_joined.csv`, `innercore_joined_w_time.csv`, `rainband_joined.csv`, `rainband_joined_w_time.csv` files for use in analysis.

This notebook outputs the following files:
- `combined_files/Combined_Reduced_Trackfile.txt`
- `combined_files/Combined_WWLLN_Locations.txt`
- `Filtered_Reduced_Trackfile.csv`
- `Filtered_WWLLN_Locations.txt`
- `WWLLN_innercore.csv`
- `WWLLN_rainband.csv`
- `WWLLN_innercore_w_time.csv`
- `WWLLN_innercore_timebin_count.csv`
- `WWLLN_rainband_timebin_count.csv`
- `WWLLN_rainband_w_time.csv`
- `innercore_joined.csv`*
- `innercore_joined_w_time.csv`
- `rainband_joined.csv`*
- `rainband_joined_w_time.csv`

*denotes files used in the subsequent lightning burst threshold analysis.

### Combining Files
We use the [Google Drive API](https://developers.google.com/drive/api/guides/about-sdk) to download the files previously uploaded in `data_upload.ipynb` to consolidate the individual files. The first half of the code works if the Google Drive API is already set up (refer to instructions in `data_upload.ipynb`). The code after we create the list of files is not dependent on the Google Drive API. This section will create intermediate files in a directory called `processed_files`, and `Combined_Reduced_Trackfile.txt` and `Combined_WWLLN_Locations.txt` in a directory called `combined_files` for use in the next section. The `processed_files` directory and its contents can be deleted after completion of this section.

Let's start by installing necessary packages and then importing them.

In [None]:
# Install necessary packages
!pip install google-api-python-client google-auth google-auth-oauthlib google-auth-httplib2

In [None]:
# Import packages
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from threading import Thread
from queue import Queue
import os
import polars as pl
from googleapiclient.http import MediaIoBaseDownload
from io import BytesIO
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
import polars as pl
import pandas as pd
import numpy as np

Similar to the function in `data_file_cleaning.ipynb`, we use the following function to authenticate the Google Drive API. This will open a browser to perform the authentication process. 

Check if a `token.pickle` file already exists before running the following code. If the file exists, it is recommended to delete it before running the code below.

In [17]:
# Scopes for accessing Google Drive
SCOPES = ['https://www.googleapis.com/auth/drive']

# Authenticate and create the service object
def authenticate_drive_api():
    creds = None
    # Token file for saving the authentication
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    # If there are no credentials, perform authentication
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'client_secrets.json', SCOPES)  # Ensure 'credentials.json' is downloaded from Google API Console
            creds = flow.run_local_server(port=0)
        # Save the credentials for future use
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return build('drive', 'v3', credentials=creds)

# Initialize the service object
service = authenticate_drive_api()


Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=389849867563-4uggnm57nqe52156v32gj1lkosoqpoem.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A37635%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive&state=GyfvJGpr2mtpuRzI6AP8xi3h1k3QIH&access_type=offline


This next function grabs the list of all files in a specified folder that are not trashed and stores them into a list. Each file has an ID and name attribute that we will use later.

In [27]:
def find_files(folder_id):
    # Query to find files in the specified folder
    query = f"'{folder_id}' in parents and trashed=false"
    files = []

    # List files in the folder and append to list
    page_token = None
    while True:
        response = service.files().list(
            q=query,
            spaces='drive',
            fields='nextPageToken, files(id, name)',
            pageToken=page_token
        ).execute()

        files += response.get('files', [])

        page_token = response.get('nextPageToken', None)
        if page_token is None:
            break
    return files

Call the function to find files in the specified folder. The folder ID can be found as the string after the "folders/" part of the URL for the Google Drive folder. This will give us a list of files to iterate through for the rest of the notebook.

In [28]:
# Get the list of files in the folder
folder_id = '14idmMBbM5xXZg4b61iINHbBTl2z4yLeD'
files = find_files(folder_id)

Next, we split out the tropical cyclone ID and name from each of the files to add as a separate column. We then save the files in the `processed_files` directory for use later.

In [None]:
# Process each file to add cyclone ID and name as columns
# Directory to save the processed files locally
output_dir = "processed_files"
os.makedirs(output_dir, exist_ok=True)

# Process each file
for file in files:
    file_id = file['id']
    file_name = file['name']

    # Extract the cyclone ID and name from the filename
    cyclone_id = '_'.join(file_name.split('_')[:3])
    cyclone_name = file_name.split('_')[3]

    # Download the file content
    request = service.files().get_media(fileId=file_id)
    file_stream = BytesIO()
    downloader = MediaIoBaseDownload(file_stream, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
    file_stream.seek(0)
    content = file_stream.read().decode('utf-8')

    # Add the cyclone id and name as a new column using Polars
    df = pl.read_csv(BytesIO(content.encode('utf-8')),separator='\t', has_header=False)
    df = df.with_columns([
    pl.lit(cyclone_id).alias("cyclone_id"),
    pl.lit(cyclone_name).alias("cyclone_name")
    ])

    # Save the modified DataFrame locally
    output_file_path = os.path.join(output_dir, file_name)
    df.write_csv(output_file_path, separator='\t',include_header=False)

    print(f"Processed and saved: {output_file_path}")

Next, we combine each of the trackfiles in the `processed_files` folder into one file, and each of the WWLLN location files into one file. This will give us two output files in the `combined_files` folder - `Combined_Reduced_Trackfile.txt` and `Combined_WWLLN_Locations.txt`. We will use these files as the basis for the next portions.

In [37]:
import glob

# Directories for processed files and output
input_dir = "processed_files"
output_dir = "combined_files"
os.makedirs(output_dir, exist_ok=True)

# File patterns to combine
patterns = {
    "Reduced_Trackfile": os.path.join(input_dir, "*Reduced_Trackfile*.txt"),
    "WWLLN_Locations": os.path.join(input_dir, "*WWLLN_Locations*.txt")
}

# Combine files based on patterns
for pattern_name, pattern_path in patterns.items():
    combined_content = []
    output_file_path = os.path.join(output_dir, f"Combined_{pattern_name}.txt")

    # Find all matching files
    matching_files = glob.glob(pattern_path)
    print(f"Combining {len(matching_files)} files for pattern '{pattern_name}'...")

    with open(output_file_path, "w") as output_file:
        for file_path in matching_files:
            with open(file_path, "r") as input_file:
                for line in input_file:
                    output_file.write(line)

    print(f"Combined file saved: {output_file_path}")

Combining 992 files for pattern 'Reduced_Trackfile'...
Combined file saved: combined_files/Combined_Reduced_Trackfile.txt
Combining 994 files for pattern 'WWLLN_Locations'...
Combined file saved: combined_files/Combined_WWLLN_Locations.txt


### Cleaning and Filtering
In this section we add a column header to the files and filter down to TCs that are category 1 and above. Category 1 is defined using the [Saffir-Simpson Hurricane Wind Scale](https://www.nhc.noaa.gov/aboutsshws.php), where the maximum sustained wind speed is between 64-82 kt. We calculate each TC's category using the Saffir-Simpson Scale and save it in a new column. We then calculate the direct distance of each lightning strike to the storm center and denote it as inner core or rainband. This section outputs `Filtered_Reduced_Trackfile.csv` and `Filtered_WWLLN_Locations.txt` files.

Start by importing the necessary libraries and files created earlier.

In [None]:
# Read the trackfile and WWLLN file
track_file = pd.read_csv("intermediate_data/Combined_Reduced_Trackfile.txt", sep="\t")
track_file = track_file.drop(track_file.columns[8], axis=1)

# Use chunk due to the large file size
chunksize = 100000
chunks = []

for chunk in pd.read_csv(
    "intermediate_data/Combined_WWLLN_Locations.txt",
    delim_whitespace=True,
    chunksize=chunksize
):
    chunks.append(chunk)

locations_WWLLN = pd.concat(chunks, ignore_index=True)

Let's add headers to the two dataframes for better readability.

In [15]:
track_file.columns = ['year', 'month', 'day','hour','lat','lon','pressure', 'knots', 'storm_code', 'storm_name']
locations_WWLLN.columns = ['year', 'month', 'day', 'hour', 'min', 'sec','lat','lon','distance_from_storm_center_km_east', 'distance_from_storm_center_km_north', 'storm_code','storm_name']

Next, we process the trackfile data by creating the list of storm codes that meet the category 1 or higher requirement. We will use this list to filter the wind speed/pressure data as well as the lightning data.

In [None]:
# calculate the max wind speed for each storm code
max_wind_speed = track_file.groupby('storm_code').agg(
    max_wind_speed=('knots', 'max')
).reset_index()
max_wind_speed.head()

Unnamed: 0,storm_code,max_wind_speed
0,ATL_10_1,85
1,ATL_10_10,30
2,ATL_10_11,135
3,ATL_10_12,120
4,ATL_10_13,105


In [44]:
# filter by max >= 64 knots
storm_filter = max_wind_speed[max_wind_speed["max_wind_speed"] >= 64].copy()

# calculate the TC category using the max wind speed
storm_filter["category"] = storm_filter["max_wind_speed"].apply(
    lambda x: 1 if 64 <= x <= 82 else (2 if 82 < x <= 95 else (3 if 95 < x <= 112 else (4 if 112 < x <= 136 else (5 if x > 136 else 0))))
)
storm_filter = storm_filter[["storm_code", "category"]]

# strip the basin from the storm code
storm_filter["basin"] = storm_filter["storm_code"].str.extract(r"^(.*?)_")
storm_filter.head()

Unnamed: 0,storm_code,category,basin
0,ATL_10_1,2,ATL
2,ATL_10_11,4,ATL
3,ATL_10_12,4,ATL
4,ATL_10_13,3,ATL
5,ATL_10_14,1,ATL


In [43]:
print(f"Overall number of TCs: {len(max_wind_speed)}, category 1 or higher number of TCs: {len(storm_filter)}")

Overall number of TCs: 982, category 1 or higher number of TCs: 473


In [45]:
# filter the trackfile data by the storm filter
track_file_filtered = track_file[track_file["storm_code"].isin(storm_filter["storm_code"])]
# join the category column by storm code
track_file_filtered = pd.merge(track_file_filtered, storm_filter, how='inner', on='storm_code')

track_file_filtered.head()

Unnamed: 0,year,month,day,hour,lat,lon,pressure,knots,storm_code,storm_name,category,basin
0,2020,10,20,0,12.1,-80.0,0,15,ATL_20_28,Zeta,2,ATL
1,2020,10,20,6,12.5,-80.1,0,15,ATL_20_28,Zeta,2,ATL
2,2020,10,20,12,12.8,-80.2,0,15,ATL_20_28,Zeta,2,ATL
3,2020,10,20,18,13.2,-80.3,0,15,ATL_20_28,Zeta,2,ATL
4,2020,10,21,0,13.8,-80.4,0,15,ATL_20_28,Zeta,2,ATL


Let's save this as a csv file for use in analysis.

In [None]:
track_file_filtered.to_csv('intermediate_data/Filtered_Reduced_Trackfile.csv', index=False)

Next, let's focus on the WWLLN dataset. Start by filtering the WWLLN dataset by the storm filter created above.

In [None]:
# filter WWLLN dataset by the storm filter
locations_WWLLN_filtered = locations_WWLLN[locations_WWLLN["storm_code"].isin(storm_filter["storm_code"])]
locations_WWLLN_filtered.head()

Now let's calculate the direct distance of each lightning instance from the storm center using a simple triangle calculation. We have the north and east distances from center, so we use the Pythagorean theorem to simply calculate the missing hypotenuse. We also create an indicator for inner core lightning and another for rainband lightning. Inner core lightning is defined as within 100km of storm center, while rainband lightning is defined as between 200-400km of storm center.

In [None]:
locations_WWLLN_filtered['hypotenuse_disance_from_storm_center'] = np.sqrt(locations_WWLLN_filtered['distance_from_storm_center_km_east'] ** 2 +locations_WWLLN['distance_from_storm_center_km_north'] ** 2)
locations_WWLLN_filtered["inner_core_ind"] = locations_WWLLN_filtered["hypotenuse_disance_from_storm_center"].apply(
    lambda x: 1 if x <= 100 else 0
)
locations_WWLLN_filtered["rainband_ind"] = locations_WWLLN_filtered["hypotenuse_disance_from_storm_center"].apply(
    lambda x: 1 if (x >= 200 and x <= 400) else 0
)

Let's save this as a txt file for future use.

In [None]:
locations_WWLLN_filtered.to_csv("intermediate_data/Filtered_WWLLN_Locations.txt", sep=' ', index=False)

### Binning the Lightning Data
To compare the lightning bursts with TC wind and pressure data, we must bin and join the trackfile and WWLLN datasets. Because of the size of the data we're working with, we can restart the kernel here to keep memory use more optimal. 

We will first bin the lightning data into 30-minute bins, yielding 48 bins a day. Each bin contains a count of lightning events during that timeframe. We will then join the binned data to the trackfile data.

Let's start by importing necessary libraries and files again.

In [1]:
# read in the WWLLN file from earlier, but filtered to keep either rainband or innercore

chunksize = 100000
chunks = []

for chunk in pd.read_csv(
    "intermediate_data/Filtered_WWLLN_Locations.txt",
    delim_whitespace=True,
    chunksize=chunksize
):
    chunks.append(chunk)

locations_WWLLN_filtered_ = pd.concat(chunks, ignore_index=True)

NameError: name 'pd' is not defined

In [None]:
locations_WWLLN_filtered_.head()

For the initial analysis, we are only interested in either inner core or rainband lighting. We drop all other lightning events that are not categorized as either.

In [None]:
# retain only the inner core or rainband lightning events using the indicator columns
locations_WWLLN_filtered_inner_rainband = locations_WWLLN_filtered_[
    ~((locations_WWLLN_filtered_["rainband_ind"] == 0) & (locations_WWLLN_filtered_["inner_core_ind"] == 0))
]

In [None]:
# filter out just the inner core data
locations_WWLLN_filtered_innercore = locations_WWLLN_filtered_inner_rainband[locations_WWLLN_filtered_inner_rainband['inner_core_ind'] == 1]
# (2611951, 15)

In [None]:
# filter out just the rainband data
locations_WWLLN_filtered_rainband = locations_WWLLN_filtered_inner_rainband[locations_WWLLN_filtered_inner_rainband['rainband_ind'] == 1]
# (10146702, 15)

Save the inner core and rainband datasets as separate csv files.

In [None]:
locations_WWLLN_filtered_innercore.to_csv("intermediate_data/WWLLN_innercore.csv", index=False)
locations_WWLLN_filtered_rainband.to_csv("intermediate_data/WWLLN_rainband.csv", index=False)

Get the min and max timestamps per storm using the track files. We use this to retain the entire length of the storm regardless of how much of the storm length the lightning events cover.

In [None]:
track_file = pd.read_csv('intermediate_data/Filtered_Reduced_Trackfile.csv')

In [None]:
# Ensure 'storm_code' is a column and not an index
track_file = track_file.reset_index(drop=True)

# Add 'min' column if not already done
track_file['min'] = 0  # Only if 'min' column hasn't been added

# Ensure that month, day, hour, and minute are padded to two digits
track_file['datetime'] = pd.to_datetime(
    track_file['year'].astype(str) + '-' +
    track_file['month'].astype(str).str.zfill(2) + '-' +
    track_file['day'].astype(str).str.zfill(2) + ' ' +
    track_file['hour'].astype(str).str.zfill(2) + ':' +
    track_file['min'].astype(str).str.zfill(2) + ':00'
)

# If 'storm_code' is an index or part of a multi-index, reset it
if 'storm_code' in track_file.index.names:
    track_file = track_file.reset_index(drop=False)

# Group by 'storm_code' and calculate min and max times
full_time_period_df = (
    track_file.groupby('storm_code', group_keys=False)['datetime']
    .agg(min_time='min', max_time='max')
    .reset_index()  # Reset the result
)

# Check the result
print(full_time_period_df.head())
# Export to csv for future reference
full_time_period_df.to_csv('intermediate_data/storm_time_period.csv')

We can restart the kernel again here to free up memory. Next, we will create 30-minute bins for the inner core dataset. The rainband dataset will need the addition of shear quadrants. All rainband data processing can be found in the `rainband_data.ipynb` notebook. 

In [7]:
# read in the files we just exported
locations_WWLLN_filtered_innercore = pd.read_csv(r"C:\Users\user\Desktop\25 WI\WWLLN_innercore.csv")
full_time_period_df = pd.read_csv("intermediate_data/storm_time_period.csv")

In [12]:
# Ensure sec column is valid
locations_WWLLN_filtered_innercore['sec'] = locations_WWLLN_filtered_innercore['sec'].apply(lambda x: 0 if x == 60 else x)

# Create a datetime column
locations_WWLLN_filtered_innercore['datetime'] = pd.to_datetime(
    locations_WWLLN_filtered_innercore['year'].astype(str) + '-' +
    locations_WWLLN_filtered_innercore['month'].astype(str).str.zfill(2) + '-' +
    locations_WWLLN_filtered_innercore['day'].astype(str).str.zfill(2) + ' ' +
    locations_WWLLN_filtered_innercore['hour'].astype(str).str.zfill(2) + ':' +
    locations_WWLLN_filtered_innercore['min'].astype(str).str.zfill(2) + ':' +
    locations_WWLLN_filtered_innercore['sec'].astype(str).str.zfill(2)
)

# Define a function to apply the 30-minute binning for each storm_code group
def add_time_bin(group):
    group['time_bin'] = group['datetime'].dt.floor('30min')
    return group

# Reset the index to avoid ambiguity during grouping
locations_WWLLN_filtered_innercore = locations_WWLLN_filtered_innercore.reset_index(drop=True)

# Group by storm_code and apply the binning function
locations_WWLLN_filtered_innercore = locations_WWLLN_filtered_innercore.groupby('storm_code', group_keys=False).apply(add_time_bin)

# Group by bins and get the count per 30-minute bin
locations_WWLLN_filtered_innercore_grouped = locations_WWLLN_filtered_innercore.groupby(['storm_code', 'time_bin'])
locations_WWLLN_filtered_innercore_timebin = locations_WWLLN_filtered_innercore_grouped.size().reset_index(name='lightning_count')

# Add missing 30-minute bins for each storm_code
def add_missing_bins(group):
    storm_code = group['storm_code'].iloc[0]
    # Use track file start and end dates to retain entirety of storm length
    min_time = full_time_period_df.loc[full_time_period_df['storm_code'] == storm_code, 'min_time'].values[0]
    max_time = full_time_period_df.loc[full_time_period_df['storm_code'] == storm_code, 'max_time'].values[0]

    # Create a full range of 30-minute bins for the time period of this storm
    full_bins = pd.DataFrame({'time_bin': pd.date_range(min_time, max_time, freq='30min')})

    # Merge the full_bins with the original group
    merged = full_bins.merge(group[['storm_code', 'time_bin', 'lightning_count']], how='left', on='time_bin')

    # Fill missing lightning_count with 0 where there is no data
    merged['lightning_count'] = merged['lightning_count'].fillna(0).astype(int)

    # Fill missing storm_code with the first valid entry in the group
    merged['storm_code'] = merged['storm_code'].fillna(method='ffill')

    return merged

# Apply the function to each storm_code
locations_WWLLN_filtered_innercore_timebin = locations_WWLLN_filtered_innercore_timebin.groupby('storm_code', group_keys=False).apply(add_missing_bins)

# Sort the final result
locations_WWLLN_filtered_innercore_timebin = locations_WWLLN_filtered_innercore_timebin.sort_values(by=['storm_code', 'time_bin'])

# Print the resulting DataFrame with the new 'time_bin' and 'lightning_count' columns
locations_WWLLN_filtered_innercore_timebin.head()

# # Export both the ungrouped and grouped datasets
locations_WWLLN_filtered_innercore.to_csv("intermediate_data/WWLLN_innercore_w_time.csv", index=False)
locations_WWLLN_filtered_innercore_timebin.to_csv("intermediate_data/WWLLN_innercore_timebin_count.csv", index=False)


  locations_WWLLN_filtered_innercore = locations_WWLLN_filtered_innercore.groupby('storm_code', group_keys=False).apply(add_time_bin)
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm_code'].fillna(method='ffill')
  merged['storm_code'] = merged['storm

### Joining the Datasets
The trackfile data includes wind and pressure data at regular intervals for each TC. Since there is no function that approximates the behavior of TC wind and pressure change, we will not interpolate the data for timestamps between the intervals - rather, we will join each lightning event to the nearest wind and pressure data and perform our analysis as such. We recognize that this process is a source of error, but we believe that joining to the nearest timestamp will keep this known error as low as possible for the sake of this analysis.

Restart the kernel again here to free up memory.

In [58]:
# read in the necessary files
filtered_reduced_trackfile = pd.read_csv("intermediate_data/Filtered_Reduced_Trackfile.csv")
locations_WWLLN_filtered_innercore_timebin = pd.read_csv("intermediate_data/WWLLN_innercore_timebin_count.csv")
locations_WWLLN_filtered_innercore = pd.read_csv("intermediate_data/WWLLN_innercore_w_time.csv")

We convert the datetime columns into datetime data types to ensure that dates are represented correctly and yield an accurate join.

In [59]:
# convert the datetime columns to the correct data type for inner core data
locations_WWLLN_filtered_innercore_timebin["time_bin"] = pd.to_datetime(locations_WWLLN_filtered_innercore_timebin["time_bin"])

locations_WWLLN_filtered_innercore_timebin["year"] = locations_WWLLN_filtered_innercore_timebin["time_bin"].dt.year
locations_WWLLN_filtered_innercore_timebin["month"] = locations_WWLLN_filtered_innercore_timebin["time_bin"].dt.month
locations_WWLLN_filtered_innercore_timebin["day"] = locations_WWLLN_filtered_innercore_timebin["time_bin"].dt.day
locations_WWLLN_filtered_innercore_timebin["hour"] = locations_WWLLN_filtered_innercore_timebin["time_bin"].dt.hour
locations_WWLLN_filtered_innercore_timebin["minute"] = locations_WWLLN_filtered_innercore_timebin["time_bin"].dt.minute

In [60]:
# clean up the minute columns
locations_WWLLN_filtered_innercore.rename(columns={'min': 'minute'}, inplace=True)
filtered_reduced_trackfile['minute'] = 0

In [61]:
filtered_reduced_trackfile.head()

Unnamed: 0,year,month,day,hour,lat,lon,pressure,knots,storm_code,storm_name,category,basin,minute
0,2020,10,20,0,12.1,-80.0,0,15,ATL_20_28,Zeta,2,ATL,0
1,2020,10,20,6,12.5,-80.1,0,15,ATL_20_28,Zeta,2,ATL,0
2,2020,10,20,12,12.8,-80.2,0,15,ATL_20_28,Zeta,2,ATL,0
3,2020,10,20,18,13.2,-80.3,0,15,ATL_20_28,Zeta,2,ATL,0
4,2020,10,21,0,13.8,-80.4,0,15,ATL_20_28,Zeta,2,ATL,0


In [62]:
locations_WWLLN_filtered_innercore_timebin = locations_WWLLN_filtered_innercore_timebin.sort_values(
    ["storm_code", "year", "month", "day", "hour"]
).reset_index(drop=True)

filtered_reduced_trackfile = filtered_reduced_trackfile.sort_values(
    ["storm_code", "year", "month", "day", "hour"]
).reset_index(drop=True)

In [63]:
# convert to polars for faster processing
innercore_count_pl = pl.from_pandas(locations_WWLLN_filtered_innercore_timebin)
innercore_w_time_pl = pl.from_pandas(locations_WWLLN_filtered_innercore)
innercore_w_time_pl = innercore_w_time_pl.with_columns(
    pl.col("time_bin").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S")
)

tracks_pl = pl.from_pandas(filtered_reduced_trackfile)

We find that Olivia, EPAC_18_17, has missing data. The storm has one line of data for 8/27/18 and then nothing until 8/30/18. After discussions with our sponsors, we decided to drop the 8/27/18 - 8/29/18 data.

In [64]:
innercore_count_pl = innercore_count_pl.filter(
    ~((pl.col("storm_code") == "EPAC_18_17") &
    (pl.col("time_bin") < pl.datetime(2018, 8, 30)) )
)

innercore_w_time_pl = innercore_w_time_pl.filter(
    ~((pl.col("storm_code") == "EPAC_18_17") &
    (pl.col("time_bin") < pl.datetime(2018, 8, 30)) )
)

In [65]:
innercore_count_pl = innercore_count_pl.with_columns(
    pl.col("year").cast(pl.Int64),
    pl.col("month").cast(pl.Int64),
    pl.col("day").cast(pl.Int64)
)

tracks_pl = tracks_pl.with_columns(
    pl.col("year").cast(pl.Int64),
    pl.col("month").cast(pl.Int64),
    pl.col("day").cast(pl.Int64)
)

In [66]:
# sort by time
innercore_count_pl = innercore_count_pl.sort(["year", "month", "day", "hour"])
innercore_w_time_pl = innercore_w_time_pl.sort(["year", "month", "day", "hour"])

tracks_pl = tracks_pl.sort(["year", "month", "day", "hour"])

Since our WWLLN lightning data and TC trackfile data operate on different time intervals (trackfile data has more regular intervals, while WWLLN lightning data represents the exact time a lightning event occurs), we will join the datasets using the "nearest" strategy and the `join_asof` function.

In [67]:


# join the binned count datasets using the nearest strategy
innercore_joined = innercore_count_pl.join_asof(
    tracks_pl,
    on="hour",
    by=["year", "month", "day","storm_code"],
    strategy="nearest",  # "backward" (default) or "forward"
    tolerance=24
)




This can lead to invalid results. Ensure the asof key is sorted
  innercore_joined = innercore_count_pl.join_asof(

This can lead to invalid results. Ensure the asof key is sorted
  innercore_joined = innercore_count_pl.join_asof(


In [68]:
# join the ungrouped datasets using the nearest strategy
innercore_joined_w_time = innercore_w_time_pl.join_asof(
    tracks_pl,
    on="hour",
    by=["year", "month", "day","storm_code"],  # Optional: match on multiple keys
    strategy="nearest"  # "backward" (default) or "forward"
)


This can lead to invalid results. Ensure the asof key is sorted
  innercore_joined_w_time = innercore_w_time_pl.join_asof(

This can lead to invalid results. Ensure the asof key is sorted
  innercore_joined_w_time = innercore_w_time_pl.join_asof(


Let's do a quick sanity check for nulls in our datasets. Start by converting the data back to pandas dataframes.

In [69]:
# convert back to pandas to check nulls
innercore_timebin_joined = innercore_joined.to_pandas()

innercore_joined_w_time = innercore_joined_w_time.to_pandas()

We create a function to check for nulls in the datasets.

In [70]:
innercore_joined_w_time.isnull().sum()
#innercore_joined_w_time[innercore_joined_w_time.isna().any(axis=1)]['storm_code']

year                                    0
month                                   0
day                                     0
hour                                    0
minute                                  0
sec                                     0
lat                                     0
lon                                     0
distance_from_storm_center_km_east      0
distance_from_storm_center_km_north     0
storm_code                              0
storm_name                              0
hypotenuse_disance_from_storm_center    0
inner_core_ind                          0
rainband_ind                            0
datetime                                0
time_bin                                0
lat_right                               0
lon_right                               0
pressure                                0
knots                                   0
storm_name_right                        0
category                                0
basin                             

### Calculating Current Category and Intensification Change
After joining the WWLLN lightning events to its nearest wind and pressure data, we will then calculate the intensity change and category at the time of each lightning event. Intensity change is defined as the difference between the current wind speed and the wind speed 24 hours later. If the exact 24 hour timestamp cannot be found, we will compare with the closest timestamp to 24 hours. The current category of the TC at the time of the lightning event is evaluated based on the current wind speed. Once again, we refer to the [Saffir-Simpson Hurricane Wind Scale](https://www.nhc.noaa.gov/aboutsshws.php).

**Intensification Change Bins**
| Intensification Stage | Change in Winds (Knots) in 24 Hours (Jiang and Ramirez, 2013)|
| --------------------- | ----------------------|
| Weakening | <-30 to -10 |
| Neutral | -10 to 10 |
| Intensifying | 10 to >30 |

**TC Category Bins**
| TC Category | Sustained Winds (knots) | 
|  ---------- | ------------|
| 1 | 64-82 kt |
| 2 | 83-95 kt |
| 3 | 96-112 kt |
| 4 | 113-136 kt |
| 5 | 137 kt or higher | 

In [71]:
def knot_category(row):
    if 64 <= row['knots'] < 83:
        return 1
    elif 83 <= row['knots'] < 96:
        return 2
    elif 96 <= row['knots'] < 113:
        return 3
    elif 113 <= row['knots'] < 136:
        return 4
    elif row['knots'] >= 137:
        return 5
    else:
        return 'Unidentified'

In [72]:
def intensification(row):
    if row['24_hour_knots_diff'] < -30:
        return 'Rapidly Weakening'
    elif -30 <= row['24_hour_knots_diff'] < -10:
        return 'Weakening'
    elif -10 <= row['24_hour_knots_diff'] < 10:
        return 'Neutral'
    elif 10 <= row['24_hour_knots_diff'] < 30:
        return 'Intensifying'
    elif 30 <= row['24_hour_knots_diff']:
        return 'Rapidly Intensifying'
    else:
        return 'Unidentified'


In [73]:
pd.set_option('display.max_rows', 500)

# Sort by 'storm_code' and 'time_bin'
innercore_timebin_joined = innercore_timebin_joined.sort_values(by=['storm_code', 'time_bin'])

innercore_timebin_joined['24_hour_knots_diff'] = innercore_timebin_joined.groupby('storm_code')['knots'].shift(periods=-48) - innercore_timebin_joined['knots']
innercore_timebin_joined['24_hour_pressure_diff'] = innercore_timebin_joined.groupby('storm_code')['pressure'].shift(periods=-48) - innercore_timebin_joined['pressure']

# Save or display the DataFrame
innercore_timebin_joined.groupby('storm_code')['24_hour_knots_diff'].max()
innercore_timebin_joined.groupby('storm_code')['24_hour_pressure_diff'].max()

innercore_timebin_joined.isnull().sum()

time_bin                     0
storm_code                2939
lightning_count              0
year                         0
month                        0
day                          0
hour                         0
minute                       0
lat                       2939
lon                       2939
pressure                  2939
knots                     2939
storm_name                2939
category                  2939
basin                     2939
minute_right              2939
24_hour_knots_diff       25595
24_hour_pressure_diff    25595
dtype: int64

In [74]:
innercore_timebin_joined['TC_Category'] = innercore_timebin_joined.apply(knot_category, axis=1)
innercore_timebin_joined['Intensification_Category'] = innercore_timebin_joined.apply(intensification, axis=1)

In [75]:
innercore_timebin_joined.groupby('basin')['TC_Category'].value_counts()

basin  TC_Category 
ATL    Unidentified    31813
       1                7481
       2                2793
       3                1672
       4                1368
       5                 276
CPAC   Unidentified     2908
       1                 576
       2                 168
       4                 102
       3                  36
       5                  18
EPAC   Unidentified    27048
       1                5628
       2                2790
       3                2395
       4                1866
       5                 114
IO     Unidentified     6615
       1                1410
       3                 498
       2                 462
       4                 450
       5                  24
SHEM   Unidentified    36940
       1                7888
       2                4176
       3                2922
       4                2466
       5                 300
WPAC   Unidentified    36771
       1               10346
       2                5693
       4               

In [76]:
innercore_timebin_joined.groupby('basin')['Intensification_Category'].value_counts()

basin  Intensification_Category
ATL    Neutral                     22237
       Intensifying                11180
       Weakening                    4606
       Unidentified                 4032
       Rapidly Intensifying         2375
       Rapidly Weakening             973
CPAC   Neutral                      1640
       Intensifying                 1127
       Weakening                     452
       Unidentified                  336
       Rapidly Weakening             133
       Rapidly Intensifying          120
EPAC   Neutral                     14955
       Intensifying                 9621
       Weakening                    6467
       Unidentified                 3936
       Rapidly Intensifying         3139
       Rapidly Weakening            1723
IO     Neutral                      3104
       Intensifying                 2412
       Unidentified                 1248
       Weakening                    1187
       Rapidly Intensifying          754
       Rapidly Weakening 

In [77]:
innercore_timebin_joined.to_csv("innercore_timebin_joined.csv", index=False)