In [1]:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import gc

print("Loading datasets...")
data_dir = os.path.join('private', 'data', 'transformed')

expected_files = [
    'merged_inflight_parts.parquet',
    'combined_rma.parquet',
    'merged_flight_data.parquet',
    'mtbf.parquet'
]

datasets = {}
pd.reset_option('display.float_format')

# Only load the files we need
file_path = os.path.join(data_dir, 'merged_inflight_parts.parquet')
if not os.path.exists(file_path):
    raise FileNotFoundError(f"File {file_path} not found")

merged_inflight_parts_df = pd.read_parquet(file_path)

# Convert to same dtypes
for col in merged_inflight_parts_df.select_dtypes(include=['float64']).columns:
    merged_inflight_parts_df[col] = pd.to_numeric(merged_inflight_parts_df[col], downcast='float')
for col in merged_inflight_parts_df.select_dtypes(include=['int64']).columns:
    merged_inflight_parts_df[col] = pd.to_numeric(merged_inflight_parts_df[col], downcast='integer')

# Convert string columns to category
categorical_candidate_cols = merged_inflight_parts_df.select_dtypes(include=['object']).columns
for col in categorical_candidate_cols:
    nunique = merged_inflight_parts_df[col].nunique()
    if nunique < len(merged_inflight_parts_df) * 0.5:  # If fewer than 50% of values are unique
        merged_inflight_parts_df[col] = merged_inflight_parts_df[col].astype('category')

print("Datasets loaded")

# Default decommision date is EIS + 10 years
merged_inflight_parts_df['DecommissionDate'] = pd.NaT
mask = merged_inflight_parts_df['DecommissionDate'].isna()
merged_inflight_parts_df.loc[mask, 'DecommissionDate'] = merged_inflight_parts_df.loc[mask, 'EntryIntoServiceDate'] + pd.Timedelta(days=365.25 * 10)

print(f"\nInfight parts dataset shape: {merged_inflight_parts_df.shape}")
print(f"Number of unique parts: {merged_inflight_parts_df['PartNumber'].nunique()}")
print(f"Number of unique aircraft: {merged_inflight_parts_df['Tail'].nunique()}")

# Concat the ProgramCode and Shipset Name Fields to create a unique identifier for each shipset
merged_inflight_parts_df['ProgramShipsetNumber'] = merged_inflight_parts_df['ProgramCode'] + '-' + merged_inflight_parts_df['ShipsetName']

# Count unique shipsets and tails
print(f"Number of unique shipsets: {merged_inflight_parts_df['ProgramShipsetNumber'].nunique()}")
print(f"Number of unique tails: {merged_inflight_parts_df['Tail'].nunique()}")

# Count number of missing OwnerCompanyId values
missing_owner_company = merged_inflight_parts_df['OwnerCompanyId'].isna().sum()
print(f"Number of missing OwnerCompanyId values: {missing_owner_company} ({(missing_owner_company / merged_inflight_parts_df.shape[0]) * 100:.2f}%)")

# Calculate percentage of unique values in key columns
key_columns = ['PartNumber', 'Tail', 'ProgramShipsetNumber', 'OwnerCompanyId']
print("\nUnique values in key columns")
for col in key_columns:
    total_values = merged_inflight_parts_df[col].count()
    unique_values = merged_inflight_parts_df[col].nunique()
    if total_values > 0:
        unique_percent = (unique_values / total_values) * 100
    else:
        unique_percent = 0
    print(f"{col}: {unique_values} unique values ({unique_percent:.2f}%) out of {total_values} non-null values")

# Remove redundant columns and columns with no unique values
redundant_columns = ['partnumber', 'productname', 'ShipsetName', 'ShipsetNumber', 
                     'EquipmentEntryId', 'DeliveryEquipmentEntryId',
                     'InstallLocationType', 'EntryType', 'ShipsetType', 
                     'definitionlevel', 'useequipmentfamily']
drop_cols = [col for col in redundant_columns if col in merged_inflight_parts_df.columns]
merged_inflight_parts_df.drop(columns=drop_cols, inplace=True)

# New identifier column
merged_inflight_parts_df['UniquePartInstallationId'] = (
    merged_inflight_parts_df['ProgramShipsetNumber'] + '-' + 
    merged_inflight_parts_df['PartNumber'] + '-' + 
    merged_inflight_parts_df['InstallLocation'].fillna('Unknown')
)

sample_size = min(100000, len(merged_inflight_parts_df))
sample_df = merged_inflight_parts_df.sample(sample_size)
unique_counts = sample_df.groupby('UniquePartInstallationId').size().reset_index(name='count')
duplicates = unique_counts[unique_counts['count'] > 1]
print(f"Number of duplicate UniquePartInstallationIds in sample: {len(duplicates)}")

if len(duplicates) > 0:
    agg_dict = {
        'operatingmode': lambda x: ';'.join(sorted(x.dropna().astype(str).unique())),
        'maximuminputpower': 'max',
    }
    
    all_other_columns = [col for col in merged_inflight_parts_df.columns 
                         if col not in agg_dict.keys() and col != 'UniquePartInstallationId']
    
    for col in all_other_columns:
        agg_dict[col] = 'first'
    
    # Group and aggregate in chunks
    chunk_size = 100000
    num_chunks = (len(merged_inflight_parts_df) + chunk_size - 1) // chunk_size
    aggregated_chunks = []
    
    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(merged_inflight_parts_df))
        print(f"Processing chunk {i+1}/{num_chunks} for aggregation...")
        
        chunk = merged_inflight_parts_df.iloc[start_idx:end_idx]
        unique_ids = chunk['UniquePartInstallationId']
        agg_chunk = chunk.groupby('UniquePartInstallationId').agg(agg_dict)        
        agg_chunk = agg_chunk.reset_index()
        aggregated_chunks.append(agg_chunk)
        
        del chunk
        del unique_ids
        gc.collect()
    
    # Combine chunks
    merged_inflight_parts_df = pd.concat(aggregated_chunks, ignore_index=True)
    del aggregated_chunks
    gc.collect()
else:
    print("No duplicates found in sample, skipping aggregation.")

## Turn into weekly time series
# Get range of dates
min_date = merged_inflight_parts_df['EntryIntoServiceDate'].min()
max_date = merged_inflight_parts_df['DecommissionDate'].max()

# Create weekly date range
date_range = pd.date_range(min_date, max_date, freq='W-MON')
print(f"Date range: {date_range[0]} to {date_range[-1]}")

# Sort by install date for efficiency in the loop
merged_inflight_parts_df = merged_inflight_parts_df.sort_values('EntryIntoServiceDate')

# Dictionaries for running totals
running_fleet_totals = {}
running_tail_totals = {}
running_location_totals = {}
running_company_totals = {}

# Create a list of required columns to keep
required_columns = ['UniquePartInstallationId', 'PartNumber', 'Tail', 'InstallLocation', 
                    'OwnerCompanyId', 'DeliveryQuantity', 'EntryIntoServiceDate', 
                    'DecommissionDate', 'ProgramShipsetNumber']

# Divide date range into chunks to process
time_chunks = [date_range[i:i+52] for i in range(0, len(date_range), 52)]
all_weekly_data = []

for chunk_idx, time_chunk in enumerate(time_chunks):
    print(f"Processing time chunk {chunk_idx+1}/{len(time_chunks)} ({time_chunk[0]} to {time_chunk[-1]})...")
    
    chunk_weekly_data = []
    
    for week_start in time_chunk:
        week_end = week_start + timedelta(days=6)
        
        # Identify events for this week using boolean
        install_mask = (
            (merged_inflight_parts_df['EntryIntoServiceDate'] > week_start - timedelta(days=7)) &
            (merged_inflight_parts_df['EntryIntoServiceDate'] <= week_end)
        )
        
        decommision_mask = (
            (merged_inflight_parts_df['DecommissionDate'] > week_start - timedelta(days=7)) &
            (merged_inflight_parts_df['DecommissionDate'] <= week_end)
        )
        
        # Process install events
        for _, part in merged_inflight_parts_df[install_mask][required_columns].iterrows():
            part_num = part['PartNumber']
            quantity = part['DeliveryQuantity']
            tail = part['Tail']
            location = part['InstallLocation']
            company = part['OwnerCompanyId']
            
            # Update running totals
            running_fleet_totals[part_num] = running_fleet_totals.get(part_num, 0) + quantity
            running_tail_totals[(part_num, tail)] = running_tail_totals.get((part_num, tail), 0) + quantity
            running_location_totals[(part_num, location)] = running_location_totals.get((part_num, location), 0) + quantity
            running_company_totals[(part_num, company)] = running_company_totals.get((part_num, company), 0) + quantity
        
        # Process decommision events
        for _, part in merged_inflight_parts_df[decommision_mask][required_columns].iterrows():
            part_num = part['PartNumber']
            quantity = part['DeliveryQuantity']
            tail = part['Tail']
            location = part['InstallLocation']
            company = part['OwnerCompanyId']
            
            # Update running totals
            running_fleet_totals[part_num] = running_fleet_totals.get(part_num, 0) - quantity
            running_tail_totals[(part_num, tail)] = running_tail_totals.get((part_num, tail), 0) - quantity
            running_location_totals[(part_num, location)] = running_location_totals.get((part_num, location), 0) - quantity
            running_company_totals[(part_num, company)] = running_company_totals.get((part_num, company), 0) - quantity
        
        # Find active parts for this week
        active_mask = (
            (merged_inflight_parts_df['EntryIntoServiceDate'] <= week_end) & 
            (merged_inflight_parts_df['DecommissionDate'] >= week_start)
        )
        
        current_week_parts = merged_inflight_parts_df[active_mask][required_columns].copy()
        # Add week start
        current_week_parts['WeekStart'] = week_start
        
        # Add running totals
        if min_date <= week_start <= max_date:
            # Add running totals using vectorized operations where possible
            current_week_parts['RunningFleetTotal'] = current_week_parts['PartNumber'].map(
                lambda pn: max(0, running_fleet_totals.get(pn, 0))
            )
            
            current_week_parts['RunningTailTotal'] = current_week_parts.apply(
                lambda row: max(0, running_tail_totals.get((row['PartNumber'], row['Tail']), 0)), axis=1
            )
            current_week_parts['RunningLocationTotal'] = current_week_parts.apply(
                lambda row: max(0, running_location_totals.get((row['PartNumber'], row['InstallLocation']), 0)), axis=1
            )
            current_week_parts['RunningCompanyTotal'] = current_week_parts.apply(
                lambda row: max(0, running_company_totals.get((row['PartNumber'], row['OwnerCompanyId']), 0)), axis=1
            )
        else:
            # Set running totals to 0 outside the date range
            current_week_parts['RunningFleetTotal'] = 0
            current_week_parts['RunningTailTotal'] = 0
            current_week_parts['RunningLocationTotal'] = 0
            current_week_parts['RunningCompanyTotal'] = 0
        
        chunk_weekly_data.append(current_week_parts)
    
    # Combine weeks in this chunk
    if chunk_weekly_data:
        time_chunk_df = pd.concat(chunk_weekly_data, ignore_index=True)
        all_weekly_data.append(time_chunk_df)
        
        del chunk_weekly_data
        gc.collect()
        
        print(f"Time chunk {chunk_idx+1} completed with {len(time_chunk_df)} rows")
        del time_chunk_df
        gc.collect()

# Combine chunks
weekly_sequence_df = pd.concat(all_weekly_data, ignore_index=True)

del all_weekly_data
del merged_inflight_parts_df
gc.collect()

print("Weekly sequences created!")
print(f"Number of weeks covered: {weekly_sequence_df['WeekStart'].nunique()}")
print(f"Number of unique parts: {weekly_sequence_df['PartNumber'].nunique()}")
print(f"Date range: {weekly_sequence_df['WeekStart'].min()} to {weekly_sequence_df['WeekStart'].max()}")

# Save to parquet
processed_data_dir = os.path.join('private', 'data', 'processed')
if not os.path.exists(processed_data_dir):
    os.makedirs(processed_data_dir)

# Save in chunks
rows_per_chunk = 1000000
num_save_chunks = (len(weekly_sequence_df) + rows_per_chunk - 1) // rows_per_chunk

if num_save_chunks <= 1:
    output_path = os.path.join(processed_data_dir, 'weekly_sequence_1_parts.parquet')
    weekly_sequence_df.to_parquet(output_path)
    print("Data saved!")
else:
    # Save in multiple chunks and then recombine
    print(f"Saving data in {num_save_chunks} chunks...")
    temp_file_paths = []
    
    for i in range(num_save_chunks):
        start_idx = i * rows_per_chunk
        end_idx = min((i + 1) * rows_per_chunk, len(weekly_sequence_df))
        
        chunk = weekly_sequence_df.iloc[start_idx:end_idx]
        temp_path = os.path.join(processed_data_dir, f'weekly_sequence_1_parts_temp_{i}.parquet')
        chunk.to_parquet(temp_path)
        temp_file_paths.append(temp_path)
        
        del chunk
        gc.collect()
    
    combined_df = pd.read_parquet(temp_file_paths[0])
    
    for i in range(1, len(temp_file_paths)):
        next_chunk = pd.read_parquet(temp_file_paths[i])
        combined_df = pd.concat([combined_df, next_chunk], ignore_index=True)
        del next_chunk
        gc.collect()
    
    # Save the combined data
    output_path = os.path.join(processed_data_dir, 'weekly_sequence_1_parts.parquet')
    combined_df.to_parquet(output_path)
    
    # Clean up temporary files
    for temp_path in temp_file_paths:
        os.remove(temp_path)
    
    print("Data saved and temporary files cleaned up!")

Loading datasets...
Datasets loaded

Infight parts dataset shape: (21347, 42)
Number of unique parts: 326
Number of unique aircraft: 331
Number of unique shipsets: 375
Number of unique tails: 331
Number of missing OwnerCompanyId values: 2298 (10.76%)

Unique values in key columns
PartNumber: 326 unique values (1.53%) out of 21347 non-null values
Tail: 331 unique values (1.69%) out of 19530 non-null values
ProgramShipsetNumber: 375 unique values (1.76%) out of 21347 non-null values
OwnerCompanyId: 49 unique values (0.26%) out of 19049 non-null values
Number of duplicate UniquePartInstallationIds in sample: 2020
Processing chunk 1/1 for aggregation...
Date range: 2012-03-26 00:00:00 to 2058-11-25 00:00:00
Processing time chunk 1/47 (2012-03-26 00:00:00 to 2013-03-18 00:00:00)...
Time chunk 1 completed with 3715 rows
Processing time chunk 2/47 (2013-03-25 00:00:00 to 2014-03-17 00:00:00)...
Time chunk 2 completed with 17353 rows
Processing time chunk 3/47 (2014-03-24 00:00:00 to 2015-03-1

In [2]:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import gc

# Define directories
processed_data_dir = os.path.join('private', 'data', 'processed')
data_dir = os.path.join('private', 'data', 'transformed')

print("Loading weekly sequence data...")
# Load the weekly sequence data
input_path = os.path.join(processed_data_dir, 'weekly_sequence_1_parts.parquet')
essential_columns = ['PartNumber', 'WeekStart', 'UniquePartInstallationId', 'RunningFleetTotal', 
                    'RunningTailTotal', 'RunningLocationTotal', 'RunningCompanyTotal', 'Tail']
weekly_sequence_df = pd.read_parquet(input_path, columns=essential_columns)

print("Weekly sequence loaded")
print(f"Memory usage of weekly_sequence_df: {weekly_sequence_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print("Loading RMA data...")
input_path = os.path.join(data_dir, 'combined_rma.parquet')

# First all available columns to see what's actually in the file
try:
    temp_df = pd.read_parquet(input_path, columns=None)
    available_columns = temp_df.columns.tolist()
    print(f"Available columns in RMA file: {available_columns}")
    del temp_df
    gc.collect()
    
    rma_columns = ['rma_number', 'part_number', 'received_date', 'ship_date', 'repair_duration']
    
    # Add optional columns if they exist
    if 'fault_category' in available_columns:
        rma_columns.append('fault_category')
    elif 'fault_code' in available_columns:
        rma_columns.append('fault_code')  # Alternative column
        
    if 'repair_reason_category' in available_columns:
        rma_columns.append('repair_reason_category')
    elif 'return_reason_category' in available_columns:
        rma_columns.append('return_reason_category')  # Alternative column
        
    if 'received_year' in available_columns:
        rma_columns.append('received_year')
        
except Exception as e:
    print(f"Error reading all columns: {str(e)}")
    rma_columns = ['rma_number', 'part_number', 'received_date', 'ship_date', 'repair_duration']

combined_rma_df = pd.read_parquet(input_path, columns=rma_columns)

# Fix data types issues
for col in combined_rma_df.select_dtypes(include=['float64']).columns:
    combined_rma_df[col] = pd.to_numeric(combined_rma_df[col], downcast='float')
for col in combined_rma_df.select_dtypes(include=['int64']).columns:
    combined_rma_df[col] = pd.to_numeric(combined_rma_df[col], downcast='integer')

# Use categories for string columns
for col in combined_rma_df.select_dtypes(include=['object']).columns:
    if combined_rma_df[col].nunique() < len(combined_rma_df) * 0.5:
        combined_rma_df[col] = combined_rma_df[col].astype('category')

# Check for missing values in critical columns
missing_values = combined_rma_df['received_date'].isna().sum()
print(f"Missing received_date values: {missing_values} ({missing_values/len(combined_rma_df)*100:.2f}%)")

# Clean the RMA data
print("Cleaning RMA data...")
combined_rma_df = combined_rma_df.dropna(subset=['received_date'])

# Check for duplicates
if 'received_year' not in combined_rma_df.columns:
    combined_rma_df['received_year'] = combined_rma_df['received_date'].dt.year

# Remove any date before 2012
combined_rma_df = combined_rma_df[combined_rma_df['received_date'] >= '2012-01-01']
if 'ship_date' in combined_rma_df.columns:
    # Handle null ship dates
    combined_rma_df = combined_rma_df[
        combined_rma_df['ship_date'].isna() | 
        (combined_rma_df['ship_date'] >= '2012-01-01')
    ]

print(f"Date range: {combined_rma_df['received_date'].min()} to {combined_rma_df['received_date'].max()}")

# Fix timezones
for date_col in ['received_date', 'ship_date']:
    if date_col in combined_rma_df.columns:
        if hasattr(combined_rma_df[date_col].dtype, 'tz') and combined_rma_df[date_col].dtype.tz is not None:
            combined_rma_df[date_col] = combined_rma_df[date_col].dt.tz_localize(None)

# Rename part_number to match weekly_sequence_df
combined_rma_df = combined_rma_df.rename(columns={'part_number': 'PartNumber'})

# Process RMA data for weekly sequences
print("Processing RMA data for weekly sequences...")
min_date = combined_rma_df['received_date'].min()
max_date = combined_rma_df['received_date'].max()
date_range = pd.date_range(min_date, max_date, freq='W-MON')
print(f"RMA date range: {date_range[0]} to {date_range[-1]}")

# Get unique part numbers in RMA data
unique_part_numbers = combined_rma_df['PartNumber'].unique()
print(f"Number of unique part numbers in RMA data: {len(unique_part_numbers)}")

# Process in chunks
chunk_size = 1000  # Process 1000 part numbers at a time
num_chunks = (len(unique_part_numbers) + chunk_size - 1) // chunk_size
all_rma_weekly_data = []

print(f"Processing {len(unique_part_numbers)} part numbers in {num_chunks} chunks...")

for chunk_idx in range(num_chunks):
    start_idx = chunk_idx * chunk_size
    end_idx = min((chunk_idx + 1) * chunk_size, len(unique_part_numbers))
    chunk_part_numbers = unique_part_numbers[start_idx:end_idx]
    
    print(f"Processing part number chunk {chunk_idx+1}/{num_chunks} ({start_idx} to {end_idx})...")
    
    # Filter RMA data to only this chunk's part numbers
    chunk_rma_df = combined_rma_df[combined_rma_df['PartNumber'].isin(chunk_part_numbers)]
    
    # Initialize tracking for this chunk
    chunk_rma_weekly_data = []
    chunk_running_rma_totals = {}
    
    # Process each week for these part numbers
    for week_start in date_range:
        week_end = week_start + timedelta(days=6)
        
        received_mask = (
            (chunk_rma_df['received_date'] >= week_start) &
            (chunk_rma_df['received_date'] <= week_end)
        )
        
        shipped_mask = (
            (chunk_rma_df['ship_date'].notna()) &
            (chunk_rma_df['ship_date'] >= week_start) &
            (chunk_rma_df['ship_date'] <= week_end)
        )
        
        # Get counts for part numbers in this chunk for this week
        received_parts = chunk_rma_df[received_mask]['PartNumber'].value_counts().to_dict()
        shipped_parts = chunk_rma_df[shipped_mask]['PartNumber'].value_counts().to_dict()
        
        # Update data for each part number in this chunk
        for part_num in chunk_part_numbers:
            received_count = received_parts.get(part_num, 0)
            shipped_count = shipped_parts.get(part_num, 0)
            
            # Only add to the results if there's activity
            if received_count > 0 or shipped_count > 0 or chunk_running_rma_totals.get(part_num, 0) > 0:
                in_repair = chunk_running_rma_totals.get(part_num, 0) + received_count - shipped_count
                
                chunk_rma_weekly_data.append({
                    'PartNumber': part_num,
                    'WeekStart': week_start,
                    'ReceivedCount': received_count,
                    'ShippedCount': shipped_count,
                    'InRepair': in_repair
                })
                
                # Update running total
                chunk_running_rma_totals[part_num] = in_repair
    
    # Convert chunk results to DataFrame
    if chunk_rma_weekly_data:
        chunk_df = pd.DataFrame(chunk_rma_weekly_data)
        all_rma_weekly_data.append(chunk_df)
        
        del chunk_rma_weekly_data
        del chunk_running_rma_totals
        del chunk_rma_df
        gc.collect()
    
    # Report progress
    print(f"Completed part number chunk {chunk_idx+1}/{num_chunks}")

# Combine all chunks
print("Combining all chunks...")
if all_rma_weekly_data:
    rma_weekly_df = pd.concat(all_rma_weekly_data, ignore_index=True)
    
    del all_rma_weekly_data
    gc.collect()
    
    print(f"RMA weekly data created with {len(rma_weekly_df)} rows")
    print(f"Memory usage of rma_weekly_df: {rma_weekly_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
else:
    print("No RMA data to process.")
    rma_weekly_df = pd.DataFrame(columns=['PartNumber', 'WeekStart', 'ReceivedCount', 'ShippedCount', 'InRepair'])

# Merge in chunks
print("Merging RMA data into weekly sequence...")

# Get unique weeks in the weekly sequence for chunking
unique_weeks = weekly_sequence_df['WeekStart'].unique()
unique_weeks = np.sort(unique_weeks)

# Process in chunks of weeks
week_chunk_size = 52  # Process one year at a time
num_week_chunks = (len(unique_weeks) + week_chunk_size - 1) // week_chunk_size
all_merged_chunks = []

for week_chunk_idx in range(num_week_chunks):
    start_idx = week_chunk_idx * week_chunk_size
    end_idx = min((week_chunk_idx + 1) * week_chunk_size, len(unique_weeks))
    chunk_weeks = unique_weeks[start_idx:end_idx]
    
    print(f"Processing week chunk {week_chunk_idx+1}/{num_week_chunks} ({start_idx} to {end_idx})...")
    
    # Filter weekly sequence to only this chunk's weeks
    sequence_chunk = weekly_sequence_df[weekly_sequence_df['WeekStart'].isin(chunk_weeks)].copy()
    rma_chunk = rma_weekly_df[rma_weekly_df['WeekStart'].isin(chunk_weeks)]
    
    # Merge this chunk
    if not rma_chunk.empty:
        merged_chunk = sequence_chunk.merge(
            rma_chunk,
            how='left',
            on=['WeekStart', 'PartNumber']
        )
        
        # Fill missing values with 0
        for col in ['ReceivedCount', 'ShippedCount', 'InRepair']:
            if col in merged_chunk.columns:
                # Only fill 0s for dates between min_date and max_date
                date_mask = (merged_chunk['WeekStart'] >= min_date) & (merged_chunk['WeekStart'] <= max_date)
                merged_chunk.loc[date_mask, col] = merged_chunk.loc[date_mask, col].fillna(0)
    else:
        merged_chunk = sequence_chunk.copy()
        for col in ['ReceivedCount', 'ShippedCount', 'InRepair']:
            merged_chunk[col] = np.nan
            # Only fill 0s for dates between min_date and max_date
            date_mask = (merged_chunk['WeekStart'] >= min_date) & (merged_chunk['WeekStart'] <= max_date)
            merged_chunk.loc[date_mask, col] = 0
    
    all_merged_chunks.append(merged_chunk)
    
    del sequence_chunk
    del rma_chunk
    gc.collect()
    
    print(f"Completed week chunk {week_chunk_idx+1}/{num_week_chunks}")

# Combine all merged chunks
print("Combining all merged chunks...")
rma_sequence_df = pd.concat(all_merged_chunks, ignore_index=True)

del all_merged_chunks
del weekly_sequence_df
del rma_weekly_df
gc.collect()

print(f"Final merged data created with {len(rma_sequence_df)} rows")
print(f"Memory usage of final data: {rma_sequence_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Save in chunks
rows_per_save_chunk = 1000000
num_save_chunks = (len(rma_sequence_df) + rows_per_save_chunk - 1) // rows_per_save_chunk

if num_save_chunks <= 1:
    print("Saving merged data...")
    output_path = os.path.join(processed_data_dir, 'weekly_sequence_2_rma.parquet')
    rma_sequence_df.to_parquet(output_path)
    print("Data saved!")
else:
    # Save in multiple chunks and then recombine
    print(f"Saving data in {num_save_chunks} chunks...")
    temp_file_paths = []
    
    for i in range(num_save_chunks):
        start_idx = i * rows_per_save_chunk
        end_idx = min((i + 1) * rows_per_save_chunk, len(rma_sequence_df))
        
        chunk = rma_sequence_df.iloc[start_idx:end_idx]
        temp_path = os.path.join(processed_data_dir, f'weekly_sequence_2_rma_temp_{i}.parquet')
        chunk.to_parquet(temp_path)
        temp_file_paths.append(temp_path)
        
        del chunk
        gc.collect()
        
        print(f"Saved chunk {i+1}/{num_save_chunks}")
    
    print("All chunks saved!")
    print(f"Temporary files saved at: {', '.join(temp_file_paths)}")

# Combine the files
print("Combining saved chunks...")
combined_df = pd.read_parquet(temp_file_paths[0])
for i in range(1, len(temp_file_paths)):
    next_chunk = pd.read_parquet(temp_file_paths[i])
    combined_df = pd.concat([combined_df, next_chunk], ignore_index=True)
    del next_chunk
    gc.collect()
# Save the combined data
output_path = os.path.join(processed_data_dir, 'weekly_sequence_2_rma.parquet')
combined_df.to_parquet(output_path)

# Clean up temporary files  
for temp_path in temp_file_paths:
    os.remove(temp_path)
print("Data saved and temporary files cleaned up!")
print("All processing completed!")
print(f"Final data saved at: {output_path}")

Loading weekly sequence data...
Weekly sequence loaded
Memory usage of weekly_sequence_df: 1806.81 MB
Loading RMA data...
Available columns in RMA file: ['source', 'rma_number', 'part_number', 'serial_number', 'customer', 'status', 'received_date', 'ship_date', 'part_description', 'fault_code', 'lru_name', 'workshop_location', 'flight_hours', 'return_reason', 'warranty_end_date', 'return_reason_category', 'repair_duration']
Missing received_date values: 105712 (36.45%)
Cleaning RMA data...
Date range: 2012-05-07 00:00:00+00:00 to 2025-04-01 12:00:00+00:00
Processing RMA data for weekly sequences...
RMA date range: 2012-05-07 00:00:00 to 2025-03-31 00:00:00
Number of unique part numbers in RMA data: 358
Processing 358 part numbers in 1 chunks...
Processing part number chunk 1/1 (0 to 358)...
Completed part number chunk 1/1
Combining all chunks...
RMA weekly data created with 76916 rows
Memory usage of rma_weekly_df: 7.28 MB
Merging RMA data into weekly sequence...
Processing week chunk 

In [3]:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import gc
import re

# Define directories
processed_data_dir = os.path.join('private', 'data', 'processed')
data_dir = os.path.join('private', 'data', 'transformed')

print("Loading weekly sequence data...")
# Load the weekly sequence data efficiently
input_path = os.path.join(processed_data_dir, 'weekly_sequence_2_rma.parquet')

essential_columns = ['PartNumber', 'WeekStart', 'Tail', 'ReceivedCount', 'ShippedCount', 'InRepair']
rma_sequence_df = pd.read_parquet(input_path, columns=essential_columns)

print("Weekly sequence loaded")

# Standardize tail numbers function - do this early to avoid duplicate computation
def standardize_tail(tail):
    if not isinstance(tail, str):
        return tail
    # Remove all non-alphanumeric chars
    tail = re.sub(r'[^A-Za-z0-9]', '', tail)
    return tail.upper()

# Apply standardization
rma_sequence_df['StandardTail'] = rma_sequence_df['Tail'].apply(
    lambda x: standardize_tail(x) if pd.notna(x) else x
)

# Load the flight data with memory optimizations
print("Loading flight data...")
input_path = os.path.join(data_dir, 'merged_flight_data.parquet')
# Only load columns we need
flight_columns = ['TailNumber', 'FlightStartTime', 'FlightEndTime', 'FlightDuration',
                  'RawResets', 'TotalPassengers', 'BusinessClass', 'EconomyClass',
                  'SeatResets', 'Airline', 'AircraftType']
merged_flight_data_df = pd.read_parquet(input_path, columns=flight_columns)

# Optimize memory usage
for col in merged_flight_data_df.select_dtypes(include=['float64']).columns:
    merged_flight_data_df[col] = pd.to_numeric(merged_flight_data_df[col], downcast='float')
for col in merged_flight_data_df.select_dtypes(include=['int64']).columns:
    merged_flight_data_df[col] = pd.to_numeric(merged_flight_data_df[col], downcast='integer')

# Use categories for string columns with low cardinality
string_cols = ['TailNumber', 'Airline', 'AircraftType']
for col in string_cols:
    if col in merged_flight_data_df.columns:
        merged_flight_data_df[col] = merged_flight_data_df[col].astype('category')

print(f"Memory usage of merged_flight_data_df: {merged_flight_data_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(merged_flight_data_df.head())

# Clean and filter the flight data
print("Cleaning flight data...")
# Filter out future dates
current_date = datetime.now()
merged_flight_data_df = merged_flight_data_df[merged_flight_data_df['FlightStartTime'] <= current_date]

# Remove flights with unrealistic durations
merged_flight_data_df = merged_flight_data_df[
    (merged_flight_data_df['FlightDuration'] >= timedelta(minutes=30)) &
    (merged_flight_data_df['FlightDuration'] <= timedelta(hours=24))
]

print(f"Date range: {merged_flight_data_df['FlightStartTime'].min()} to {merged_flight_data_df['FlightEndTime'].max()}")
print(f"After filtering: {len(merged_flight_data_df)} rows")

# Standardize tail numbers in flight data
merged_flight_data_df['StandardTail'] = merged_flight_data_df['TailNumber'].apply(
    lambda x: standardize_tail(x) if pd.notna(x) else x
)

# Create WeekStart column
merged_flight_data_df['WeekStart'] = pd.to_datetime(
    merged_flight_data_df['FlightStartTime']
).dt.to_period('W-MON').dt.start_time

# Find overlapping tails
flight_unique_tails = set(merged_flight_data_df['StandardTail'].dropna().unique())
sequence_unique_tails = set(rma_sequence_df['StandardTail'].dropna().unique())
overlap_tails = flight_unique_tails.intersection(sequence_unique_tails)

print(f"Unique tails in flight data: {len(flight_unique_tails)}")
print(f"Unique tails in sequence data: {len(sequence_unique_tails)}")
print(f"Overlapping unique tails: {len(overlap_tails)}")
if len(overlap_tails) > 0:
    print(f"Sample overlapping tails: {list(overlap_tails)[:5]}")

# Filter flight data to only overlapping tails before aggregation
filtered_flight_data = merged_flight_data_df[merged_flight_data_df['StandardTail'].isin(overlap_tails)]
print(f"Filtered flight data from {len(merged_flight_data_df)} to {len(filtered_flight_data)} rows")

del merged_flight_data_df
gc.collect()

# Process in chunks
tail_list = list(overlap_tails)
tail_chunk_size = 50  # Process 50 tails at a time
num_tail_chunks = (len(tail_list) + tail_chunk_size - 1) // tail_chunk_size
all_flight_weekly_data = []

print(f"Processing {len(tail_list)} tails in {num_tail_chunks} chunks...")

for chunk_idx in range(num_tail_chunks):
    start_idx = chunk_idx * tail_chunk_size
    end_idx = min((chunk_idx + 1) * tail_chunk_size, len(tail_list))
    chunk_tails = tail_list[start_idx:end_idx]
    
    print(f"Processing tail chunk {chunk_idx+1}/{num_tail_chunks} ({start_idx} to {end_idx})...")
    
    # Filter data for this chunk of tails
    chunk_data = filtered_flight_data[filtered_flight_data['StandardTail'].isin(chunk_tails)]
    
    # Aggregate flight data by tail and week
    chunk_weekly = chunk_data.groupby(['StandardTail', 'WeekStart']).agg({
        'RawResets': lambda x: x.sum() if not x.isna().all() else None,
        'TotalPassengers': lambda x: x.fillna(0).sum(),
        'BusinessClass': lambda x: x.fillna(0).sum(),
        'EconomyClass': lambda x: x.fillna(0).sum(),
        'FlightDuration': lambda x: x.dropna().dt.total_seconds().sum() / 3600,
        'SeatResets': lambda x: x.fillna(0).sum(),
        'Airline': lambda x: x.dropna().iloc[0] if not x.dropna().empty else None,
        'AircraftType': lambda x: x.dropna().iloc[0] if not x.dropna().empty else None
    }).reset_index()
    
    all_flight_weekly_data.append(chunk_weekly)
    
    # Clean up
    del chunk_data
    gc.collect()
    
    print(f"Completed tail chunk {chunk_idx+1}/{num_tail_chunks}")

# Combine all chunks
print("Combining all chunks...")
if all_flight_weekly_data:
    flight_weekly_data = pd.concat(all_flight_weekly_data, ignore_index=True)
    del all_flight_weekly_data
    gc.collect()
    
    print("Weekly flight data created:")
    print(flight_weekly_data.head())
    
    spot_check = flight_weekly_data[
        (flight_weekly_data['WeekStart'] >= '2023-01-01') & 
        (flight_weekly_data['WeekStart'] <= '2023-01-07')
    ]
    if not spot_check.empty:
        print(spot_check.head())
else:
    print("No flight data to process")
    flight_weekly_data = pd.DataFrame(columns=['StandardTail', 'WeekStart', 'RawResets', 
                                              'TotalPassengers', 'BusinessClass', 'EconomyClass',
                                              'FlightDuration', 'SeatResets', 'Airline', 'AircraftType'])

flight_lookup = {}
for _, row in flight_weekly_data.iterrows():
    tail = row['StandardTail']
    week = row['WeekStart']
    
    if tail not in flight_lookup:
        flight_lookup[tail] = {}
    
    flight_lookup[tail][week] = {
        'RawResets': row['RawResets'],
        'TotalPassengers': row['TotalPassengers'],
        'BusinessClass': row['BusinessClass'],
        'EconomyClass': row['EconomyClass'],
        'FlightDuration': row['FlightDuration'],
        'SeatResets': row['SeatResets'],
        'Airline': row['Airline'],
        'AircraftType': row['AircraftType']
    }

del flight_weekly_data
gc.collect()

# Optimized function to find closest flight week
def find_closest_flight_week(row, max_weeks_diff=52):
    tail = row['StandardTail']
    week = row['WeekStart']
    
    # Skip if tail is not in flight data or is null
    if pd.isna(tail) or tail not in flight_lookup:
        return [None] * 10  # Return list of None values for all expected columns
    
    # Get all weeks for this tail
    weeks_data = flight_lookup[tail]
    if not weeks_data:
        return [None] * 10
    
    # Calculate time difference
    week_diffs = []
    for w in weeks_data:
        days_diff = abs((w - week).days)
        weeks_diff = days_diff / 7
        if weeks_diff <= max_weeks_diff: 
            week_diffs.append((weeks_diff, w))
    
    if not week_diffs:
        return [None] * 10
    
    # Find closest week
    closest_diff, closest_week = min(week_diffs, key=lambda x: x[0])
    
    # Get the flight data for the closest week
    flight_data = weeks_data[closest_week]
    
    return [
        flight_data['RawResets'],
        flight_data['TotalPassengers'],
        flight_data['BusinessClass'],
        flight_data['EconomyClass'],
        flight_data['FlightDuration'],
        flight_data['SeatResets'],
        flight_data['Airline'],
        flight_data['AircraftType'],
        closest_diff,
        closest_week
    ]

# Filter RMA sequence data to overlapping tails
rma_filtered = rma_sequence_df[rma_sequence_df['StandardTail'].isin(overlap_tails)].copy()
print(f"Filtered from {len(rma_sequence_df)} to {len(rma_filtered)} rows with overlapping tails")

# Process in smaller chunks
chunk_size = 5000  # Smaller chunks to reduce memory pressure
num_chunks = (len(rma_filtered) + chunk_size - 1) // chunk_size

# Preallocate arrays for more efficient result storage
result_columns = ['RawResets', 'TotalPassengers', 'BusinessClass', 'EconomyClass', 
                 'FlightDuration', 'SeatResets', 'Airline', 'AircraftType', 
                 'week_diff', 'flight_WeekStart']
results = {col: np.empty(len(rma_filtered), dtype=object) for col in result_columns}

print(f"Processing {len(rma_filtered)} rows in {num_chunks} chunks...")
for i in range(num_chunks):
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(rma_filtered))
    chunk = rma_filtered.iloc[start_idx:end_idx]
    
    print(f"Processing chunk {i+1}/{num_chunks} ({start_idx} to {end_idx})...")
    
    # Process each row in the chunk
    for j, (_, row) in enumerate(chunk.iterrows()):
        result_values = find_closest_flight_week(row)
        
        for col_idx, col in enumerate(result_columns):
            results[col][start_idx + j] = result_values[col_idx]
    
    # Print progress
    if (i+1) % 5 == 0 or i+1 == num_chunks:
        print(f"Completed {i+1}/{num_chunks} chunks ({(i+1)/num_chunks*100:.1f}%)")
        gc.collect()

# Add results to rma_filtered
for col in result_columns:
    rma_filtered[col] = results[col]

del results
gc.collect()

# Prepare remaining data with correct data types
print("Adding flight columns to remaining flight data...")
rma_remaining = rma_sequence_df[~rma_sequence_df['StandardTail'].isin(overlap_tails)].copy()

for col in result_columns:
    if col == 'flight_WeekStart':
        rma_remaining[col] = pd.Series(dtype='datetime64[ns]')
    elif col == 'Airline' or col == 'AircraftType':
        rma_remaining[col] = pd.Series(dtype='object')
    else:
        rma_remaining[col] = pd.Series(dtype='float64')

print("Saving data in separate files...")
filtered_path = os.path.join(processed_data_dir, 'flight_sequence_filtered.parquet')
remaining_path = os.path.join(processed_data_dir, 'flight_sequence_remaining.parquet')

rma_filtered.to_parquet(filtered_path)
rma_remaining.to_parquet(remaining_path)

del rma_filtered
del rma_remaining
del rma_sequence_df
gc.collect()

# Final output path
output_path = os.path.join(processed_data_dir, 'weekly_sequence_3_flight.parquet')

# Define a function to divide files into smaller chunks
def process_large_file_in_chunks(file_path, chunk_size=100000):
    try:
        file_info = pd.read_parquet(file_path, columns=[])
        total_rows = len(file_info)
        del file_info
        gc.collect()
        
        for i in range(0, total_rows, chunk_size):
            end_row = min(i + chunk_size, total_rows)
            print(f"  Reading rows {i} to {end_row} of {total_rows}...")
            chunk = pd.read_parquet(file_path, rows=slice(i, end_row))
            yield chunk
            
    except Exception as e:
        print(f"Chunking failed, reading entire file: {str(e)}")
        yield pd.read_parquet(file_path)

try:
    print(f"Processing filtered file: {filtered_path}")
    first_chunk = True
    for chunk in process_large_file_in_chunks(filtered_path):
        if first_chunk:
            print(f"  Writing first chunk to output...")
            chunk.to_parquet(output_path)
            first_chunk = False
        else:
            print(f"  Appending chunk to output...")
            chunk.to_parquet(output_path, append=True)
        
        del chunk
        gc.collect()
    
    # Append the remaining data
    print(f"Processing remaining file: {remaining_path}")
    for chunk in process_large_file_in_chunks(remaining_path):
        print(f"  Appending chunk to output...")
        # Ensure column data types match
        for col in chunk.columns:
            if col in result_columns:
                try:
                    sample = pd.read_parquet(output_path, rows=1)
                    if col in sample.columns:
                        chunk[col] = chunk[col].astype(sample[col].dtype)
                    del sample
                except:
                    pass 
                
        chunk.to_parquet(output_path, append=True)
        
        del chunk
        gc.collect()
    
    print("All files combined successfully")
    
except Exception as e:
    print(f"Error in pandas approach: {str(e)}")
    print("Falling back to simplest approach...")
    
    try:
        filtered_data = pd.read_parquet(filtered_path)
        remaining_data = pd.read_parquet(remaining_path)
        combined_data = pd.concat([filtered_data, remaining_data], ignore_index=True)
        combined_data.to_parquet(output_path)
        
        del filtered_data
        del remaining_data
        del combined_data
        gc.collect()
                
    except Exception as e:        
        print(f"Files saved separately at {filtered_path} and {remaining_path}")        

try:
    os.remove(filtered_path)
    os.remove(remaining_path)
    print("Temporary files removed")
except Exception as e:
    print(f"Could not remove temporary files: {str(e)}")

# Validate final output
try:
    sample = pd.read_parquet(output_path, rows=5)
    print("\nSample of combined data:")
    print(sample[['StandardTail', 'WeekStart', 'ReceivedCount', 'ShippedCount', 'InRepair']].head())
    
    # Count rows with flight data
    stats = pd.read_parquet(output_path, columns=['RawResets'])
    matched_rows = stats['RawResets'].notna().sum()
    total_rows = len(stats)
    print(f"\nRows with flight data: {matched_rows} out of {total_rows} ({matched_rows/total_rows*100:.2f}%)")
    
    del sample
    del stats
    gc.collect()
except Exception as e:
    print(f"Validation failed: {str(e)}")

print("Data saved!")

Loading weekly sequence data...
Weekly sequence loaded
Loading flight data...
Memory usage of merged_flight_data_df: 22.57 MB
  TailNumber FlightStartTime FlightEndTime FlightDuration  RawResets  \
0     PR-MHF             NaT           NaT            NaT       <NA>   
1    RPC9911             NaT           NaT            NaT       <NA>   
2    RPC9911             NaT           NaT            NaT       <NA>   
3    RPC9911             NaT           NaT            NaT       <NA>   
4     PT-MXE             NaT           NaT            NaT       <NA>   

   TotalPassengers  BusinessClass  EconomyClass  SeatResets Airline  \
0              NaN            NaN           NaN         NaN     LAN   
1              NaN            NaN           NaN         NaN     PAL   
2              NaN            NaN           NaN         NaN     PAL   
3              NaN            NaN           NaN         NaN     PAL   
4              NaN            NaN           NaN         NaN     LAN   

      Aircraft

  chunk_weekly = chunk_data.groupby(['StandardTail', 'WeekStart']).agg({


Completed tail chunk 1/3
Processing tail chunk 2/3 (50 to 100)...


  chunk_weekly = chunk_data.groupby(['StandardTail', 'WeekStart']).agg({


Completed tail chunk 2/3
Processing tail chunk 3/3 (100 to 137)...


  chunk_weekly = chunk_data.groupby(['StandardTail', 'WeekStart']).agg({


Completed tail chunk 3/3
Combining all chunks...
Weekly flight data created:
  StandardTail  WeekStart  RawResets  TotalPassengers  BusinessClass  \
0        3BNBM 2019-06-04       <NA>              NaN            NaN   
1        3BNBM 2019-07-02       <NA>              NaN            NaN   
2        3BNBM 2024-04-09       <NA>              NaN            NaN   
3        3BNBM 2024-04-16       <NA>              NaN            NaN   
4        3BNBM 2024-04-23       <NA>              NaN            NaN   

   EconomyClass  FlightDuration  SeatResets Airline AircraftType  
0           NaN             NaN         NaN    <NA>         <NA>  
1           NaN             NaN         NaN    <NA>         <NA>  
2           NaN             NaN         NaN    <NA>         <NA>  
3           NaN             NaN         NaN    <NA>         <NA>  
4           NaN             NaN         NaN    <NA>         <NA>  
Filtered from 6909194 to 2843069 rows with overlapping tails
Processing 2843069 rows in 

In [4]:
import os
import pandas as pd
import numpy as np
from datetime import timedelta

# Define paths
processed_data_dir = os.path.join('private', 'data', 'processed')

print("Loading data...")
input_path = os.path.join(processed_data_dir, 'weekly_sequence_3_flight.parquet')
flight_sequence_df = pd.read_parquet(input_path)
print("Weekly sequence loaded")

# Make sure data_dir is defined correctly
data_dir = os.path.join('private', 'data', 'transformed') 
input_path = os.path.join(data_dir, 'mtbf.parquet')
mtbf_df = pd.read_parquet(input_path)
print("MTBF data loaded")

for df in [flight_sequence_df, mtbf_df]:
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')

print(mtbf_df.head())
duplicates = mtbf_df[mtbf_df.duplicated(subset=['PartNumber', 'Month', 'Airline'], keep=False)]
print(f"Number of duplicate partnumbers: {len(duplicates)}")

# date range
min_date = mtbf_df['Month'].min()
max_date = mtbf_df['Month'].max()
print(f"Date range: {min_date} to {max_date}")

# Create a weekly date range
mtbf_df['WeekStart'] = pd.to_datetime(mtbf_df['Month'].dt.to_period('W-MON').dt.start_time)

# Unique values summary
print("\nUnique values in essential columns")
for col in ['PartNumber', 'Month', 'Airline', 'ContractualMTBF', 'Failures', 'FlightHours']:
    total_values = mtbf_df[col].count()  # Count non-NaN values
    unique_values = mtbf_df[col].nunique()
    if total_values > 0:
        unique_percent = (unique_values / total_values) * 100
    else:
        unique_percent = 0
    print(f"{col}: {unique_values} unique values ({unique_percent:.2f}%) out of {total_values} non-null values")

# Find overlapping partnumbers
flight_unique_parts = set(flight_sequence_df['PartNumber'].dropna().unique())
mtbf_unique_parts = set(mtbf_df['PartNumber'].dropna().unique())
overlap_parts = flight_unique_parts.intersection(mtbf_unique_parts)

print(f"Unique parts in flight data: {len(flight_unique_parts)}")
print(f"Unique parts in MTBF data: {len(mtbf_unique_parts)}")
print(f"Overlapping unique parts: {len(overlap_parts)}")

mtbf_df_filtered = mtbf_df[mtbf_df['PartNumber'].isin(overlap_parts)].copy()
print(f"Filtered MTBF data from {len(mtbf_df)} to {len(mtbf_df_filtered)} rows")

# Add the WeekStart column to flight_sequence_df to avoid doing it multiple times
if 'WeekStart' not in flight_sequence_df.columns:
    flight_sequence_df['WeekStart'] = pd.to_datetime(flight_sequence_df['Date'].dt.to_period('W-MON').dt.start_time)

flight_filtered = flight_sequence_df[flight_sequence_df['PartNumber'].isin(overlap_parts)].copy()
print(f"Filtered flight data from {len(flight_sequence_df)} to {len(flight_filtered)} rows")

# Aggregate MTBF data by PartNumber and WeekStart
print("Aggregating MTBF data by PartNumber and WeekStart...")
mtbf_weekly_data = mtbf_df_filtered.groupby(['PartNumber', 'WeekStart']).agg({
    'ContractualMTBF': 'first',  # Contractual MTBF per week
    'Failures': 'sum',  # Total failures per week
    'Airline': 'first',  # Airline
}).reset_index()

print("Weekly MTBF data created")
print(mtbf_weekly_data.head())

del mtbf_df
del mtbf_df_filtered
import gc
gc.collect()

mtbf_lookup = {}
for _, row in mtbf_weekly_data.iterrows():
    part = row['PartNumber']
    week = row['WeekStart']
    
    if part not in mtbf_lookup:
        mtbf_lookup[part] = {}
    
    mtbf_lookup[part][week] = {
        'ContractualMTBF': row['ContractualMTBF'],
        'Failures': row['Failures'],
        'Airline': row['Airline']
    }

del mtbf_weekly_data
gc.collect()

def find_closest_mtbf_week(row, max_weeks_diff=52):
    part = row['PartNumber']
    week = row['WeekStart']
    airline = row.get('Airline')
    
    # Skip if part is not in MTBF data
    if pd.isna(part) or part not in mtbf_lookup:
        return pd.Series([None, None, None, None], 
                         index=['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart'])
    
    # Find all weeks for this part
    weeks_data = mtbf_lookup[part]
    if not weeks_data:
        return pd.Series([None, None, None, None], 
                         index=['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart'])
    
    # Filter by airline
    if not pd.isna(airline):
        matching_weeks = {w: data for w, data in weeks_data.items() 
                         if data['Airline'] == airline}
        if matching_weeks:
            weeks_data = matching_weeks
    
    # Calculate time differences
    week_diffs = []
    for w in weeks_data:
        days_diff = abs((w - week).days)
        weeks_diff = days_diff / 7
        if weeks_diff <= max_weeks_diff:
            week_diffs.append((weeks_diff, w))
    
    if not week_diffs:
        return pd.Series([None, None, None, None], 
                         index=['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart'])
    
    # Find closest week
    closest_diff, closest_week = min(week_diffs, key=lambda x: x[0])
    mtbf_data = weeks_data[closest_week]
    
    return pd.Series([
        mtbf_data['ContractualMTBF'],
        mtbf_data['Failures'],
        closest_diff,
        closest_week
    ], index=['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart'])

# Process in chunks
chunk_size = 5000 
num_chunks = (len(flight_filtered) + chunk_size - 1) // chunk_size

mtbf_ContractualMTBF = np.empty(len(flight_filtered), dtype=object)
mtbf_Failures = np.empty(len(flight_filtered), dtype=object)
mtbf_week_diff = np.empty(len(flight_filtered), dtype=object)
mtbf_WeekStart = np.empty(len(flight_filtered), dtype=object)

print(f"Processing {len(flight_filtered)} rows in {num_chunks} chunks...")
for i in range(num_chunks):
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(flight_filtered))
    chunk = flight_filtered.iloc[start_idx:end_idx]
    
    print(f"Processing chunk {i+1}/{num_chunks} ({start_idx} to {end_idx})...")
    
    # Process each row in the chunk
    for j, (idx, row) in enumerate(chunk.iterrows()):
        result = find_closest_mtbf_week(row)
        mtbf_ContractualMTBF[start_idx + j] = result['mtbf_ContractualMTBF']
        mtbf_Failures[start_idx + j] = result['mtbf_Failures']
        mtbf_week_diff[start_idx + j] = result['mtbf_week_diff']
        mtbf_WeekStart[start_idx + j] = result['mtbf_WeekStart']
    
    # Print progress update and clear memory
    if (i+1) % 5 == 0 or i+1 == num_chunks:
        print(f"Completed {i+1}/{num_chunks} chunks ({(i+1)/num_chunks*100:.1f}%)")
        gc.collect()  # Force garbage collection

# Add results to flight_filtered
flight_filtered['mtbf_ContractualMTBF'] = mtbf_ContractualMTBF
flight_filtered['mtbf_Failures'] = mtbf_Failures
flight_filtered['mtbf_week_diff'] = mtbf_week_diff
flight_filtered['mtbf_WeekStart'] = mtbf_WeekStart

# Add columns to flight_remaining
print("Adding MTBF columns to remaining flight data...")
flight_remaining = flight_sequence_df[~flight_sequence_df['PartNumber'].isin(overlap_parts)]
for col in ['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart']:
    flight_remaining[col] = None

# Combine the data efficiently
print("Combining datasets...")
indexes_to_update = flight_filtered.index
mtbf_sequence_df = flight_sequence_df.copy()

# Update only the necessary rows with MTBF data
for col in ['mtbf_ContractualMTBF', 'mtbf_Failures', 'mtbf_week_diff', 'mtbf_WeekStart']:
    mtbf_sequence_df.loc[indexes_to_update, col] = flight_filtered[col].values

matched_rows = mtbf_sequence_df['mtbf_ContractualMTBF'].notna().sum()
print(f"Rows with MTBF data after matching: {matched_rows} out of {len(mtbf_sequence_df)} ({matched_rows/len(mtbf_sequence_df)*100:.2f}%)")

# Show distribution of week differences
if 'mtbf_week_diff' in mtbf_sequence_df.columns and mtbf_sequence_df['mtbf_week_diff'].notna().any():
    week_diff_stats = mtbf_sequence_df['mtbf_week_diff'].describe()
    print("\nWeek difference statistics:")
    print(week_diff_stats)
    
    print("\nDistribution of week differences:")
    week_ranges = [(0, 4), (4, 12), (12, 26), (26, 52)]
    for start, end in week_ranges:
        count = ((mtbf_sequence_df['mtbf_week_diff'] >= start) & (mtbf_sequence_df['mtbf_week_diff'] < end)).sum()
        pct = count / matched_rows * 100 if matched_rows > 0 else 0
        print(f"{start}-{end} weeks: {count} ({pct:.1f}%)")
    
    count = (mtbf_sequence_df['mtbf_week_diff'] >= 52).sum()
    pct = count / matched_rows * 100 if matched_rows > 0 else 0
    print(f"52+ weeks: {count} ({pct:.1f}%)")

if matched_rows > 0:
    sample_matched = mtbf_sequence_df[mtbf_sequence_df['mtbf_ContractualMTBF'].notna()].sample(min(5, matched_rows))
    print(sample_matched[['PartNumber', 'WeekStart', 'mtbf_WeekStart', 'mtbf_week_diff', 'mtbf_ContractualMTBF', 'mtbf_Failures']])

# Save the results
print("Saving merged data...")
output_path = os.path.join(processed_data_dir, 'weekly_sequence_4_mtbf.parquet')
mtbf_sequence_df.to_parquet(output_path)
print("Data saved!")

Loading data...
Weekly sequence loaded
MTBF data loaded
   MTBFID Airline  PartNumber PartGroup DetailPartGroup  \
0       1     TSC  00-5103-30       RDU            RDU1   
1       2     TSC  00-5105-01       RDU            RDU1   
2       3     TSC  00-5105-30       RDU            RDU1   
3       4     TSC  00-5108-01       RDU            RDU1   
4       5     TSC  00-6204-10       REU             REU   

                               Description      Month  PoweredOnHours  \
0                           RDU, 12.1 Inch 2020-05-01             0.0   
1                            RDU, 8.9 Inch 2020-05-01             0.0   
2                            RDU, 8.9 Inch 2020-05-01             0.0   
3                     RDU, 15.3 Inch, PCAP 2020-05-01             0.0   
4  Remote Jack Unit Extender Unit 2 (REU2) 2020-05-01             0.0   

   FlightHours  Failures  NFF  ContractualMTBF  UpdateCount  
0          0.0         0    0            12000            0  
1          0.0         1  

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  flight_remaining[col] = None
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  flight_remaining[col] = None
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  flight_remaining[col] = None
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value ins

Combining datasets...
Rows with MTBF data after matching: 989942 out of 6909194 (14.33%)

Week difference statistics:
count     989942.000000
unique       104.000000
top            1.857143
freq      140950.000000
Name: mtbf_week_diff, dtype: float64

Distribution of week differences:
0-4 weeks: 624798 (63.1%)
4-12 weeks: 61796 (6.2%)
12-26 weeks: 107395 (10.8%)
26-52 weeks: 195953 (19.8%)
52+ weeks: 0 (0.0%)
         PartNumber  WeekStart mtbf_WeekStart mtbf_week_diff  \
3388596  00-6243-10 2019-07-08     2019-12-31      25.142857   
4350860  00-6233-02 2023-12-25     2023-05-30      29.857143   
489440   00-5024-01 2020-03-16     2020-03-31       2.142857   
342087   00-6203-01 2019-03-04     2019-02-26       0.857143   
3939376  00-5155-12 2022-02-28     2022-05-31      13.142857   

        mtbf_ContractualMTBF mtbf_Failures  
3388596                    0             0  
4350860                15000             0  
489440                 35000             0  
342087                

In [5]:
import pandas as pd
import os
import numpy as np
from datetime import datetime, timedelta
import gc

# Define directories
processed_data_dir = os.path.join('private', 'data', 'processed')
data_dir = os.path.join('private', 'data', 'transformed')

print("Loading weekly sequence data...")
input_path = os.path.join(processed_data_dir, 'weekly_sequence_4_mtbf.parquet')

needed_columns = [
    'PartNumber', 'WeekStart', 'ContractualMTBF', 'EntryIntoServiceDate',
    'RunningFleetTotal', 'ReceivedCount', 'ShippedCount', 'InRepair',
    'RawResets', 'Failures', 'SeatResets', 'TotalPassengers', 'Tail',
    'StandardTail', 'Airline', 'FlightDuration', 'BusinessClass', 'EconomyClass'
]

try:
    mtbf_sequence_df = pd.read_parquet(input_path, columns=needed_columns)
except Exception as e:
    file_schema = pd.read_parquet(input_path, columns=None).head(0)
    available_columns = file_schema.columns.tolist()    
    existing_columns = [col for col in needed_columns if col in available_columns]
    mtbf_sequence_df = pd.read_parquet(input_path, columns=existing_columns)

print("Weekly sequence loaded")

# Create Year column
mtbf_sequence_df['Year'] = mtbf_sequence_df['WeekStart'].dt.year

for col in mtbf_sequence_df.select_dtypes(include=['float64']).columns:
    mtbf_sequence_df[col] = pd.to_numeric(mtbf_sequence_df[col], downcast='float')
for col in mtbf_sequence_df.select_dtypes(include=['int64']).columns:
    mtbf_sequence_df[col] = pd.to_numeric(mtbf_sequence_df[col], downcast='integer')

# Contractual MTBF should be updated at the existence of a record then carried forward
if 'ContractualMTBF' in mtbf_sequence_df.columns:
    unique_parts = mtbf_sequence_df['PartNumber'].unique()
    chunk_size = 1000  
    num_chunks = (len(unique_parts) + chunk_size - 1) // chunk_size
    
    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(unique_parts))
        chunk_parts = unique_parts[start_idx:end_idx]
        
        part_mask = mtbf_sequence_df['PartNumber'].isin(chunk_parts)
        
        # Fill this chunk of parts
        mtbf_sequence_df.loc[part_mask, 'ContractualMTBF'] = (
            mtbf_sequence_df.loc[part_mask]
            .groupby('PartNumber')['ContractualMTBF']
            .ffill()
        )
        
        # Print progress
        if (i+1) % 10 == 0 or i+1 == num_chunks:
            print(f"Completed {i+1}/{num_chunks} chunks ({(i+1)/num_chunks*100:.1f}%)")
        
        gc.collect()

print(f"Date range: {mtbf_sequence_df['WeekStart'].min()} to {mtbf_sequence_df['WeekStart'].max()}")
print(f"Number of unique parts: {mtbf_sequence_df['PartNumber'].nunique()}")

# Define fields for analysis
fields = [
    'EntryIntoServiceDate',    # Important for equipment part age
    'RunningFleetTotal',       # Critical for normalizing failure rates
    'ReceivedCount',           # Number of parts received for repair
    'ShippedCount',            # Number of parts shipped after repair
    'InRepair',                # Number of parts currently in repair
    'RawResets',               # System resets - indicator of issues
    'Failures',                # Actual failures - primary prediction target
    'ContractualMTBF',         # Expected time between failures - benchmark
    'SeatResets',              # Passenger-facing issues
    'TotalPassengers',         # For normalizing by usage
]

# Filter to only fields that exist in dataframe
fields = [field for field in fields if field in mtbf_sequence_df.columns]
print("Calculating yearly completeness...")
yearly_completeness = {}
unique_years = mtbf_sequence_df['Year'].unique()
unique_years.sort()

for year in unique_years:
    if year > 2026: 
        continue
        
    year_data = mtbf_sequence_df[mtbf_sequence_df['Year'] == year]
    
    field_completeness = {}
    for field in fields:
        field_completeness[field] = year_data[field].notna().mean()
    
    yearly_completeness[year] = field_completeness
    
    del year_data
    gc.collect()

yearly_completeness_df = pd.DataFrame.from_dict(yearly_completeness, orient='index')
yearly_completeness_df = yearly_completeness_df.loc[yearly_completeness_df.index <= 2026]

print("\nYearly completeness by field:")
pd.set_option('display.float_format', '{:.2%}'.format)
print(yearly_completeness_df)

# Find years with good data for training/validation
year_completeness = yearly_completeness_df.mean(axis=1) > 0.8  # Average across all fields
complete_years = year_completeness[year_completeness].index.tolist()

# Find continuous ranges of years
year_ranges = []
current_range = []

for year in sorted(complete_years):
    if len(current_range) == 0 or year == current_range[-1] + 1:
        current_range.append(year)
    else:
        if current_range:  
            year_ranges.append(current_range)  
        current_range = [year]
if current_range:  
    year_ranges.append(current_range)

# Find the longest range 
if year_ranges: 
    longest_range = max(year_ranges, key=len)
    print(f"Longest range: {longest_range}")
else:
    longest_range = []
    print("No year ranges found")

# Determine train/validation split with safety checks
if longest_range and len(longest_range) > 3: 
    # Use most recent year with complete failure data for validation
    failure_complete_years = [year for year, row in yearly_completeness_df.iterrows() 
                            if row.get('Failures', 0) > 0.8 and year in longest_range]
    
    if failure_complete_years:
        # Find the most recent year with complete failure data
        most_recent_complete = max(failure_complete_years)
        validate_years = [most_recent_complete]
        train_years = [year for year in longest_range if year < most_recent_complete]
    else:
        validate_years = [longest_range[-1]]
        train_years = longest_range[:-1]
elif longest_range:  # Some data but not enough for good split
    print("Not enough continuous years of data for reliable validation")
    validate_years = [longest_range[-1]]
    train_years = longest_range[:-1] if len(longest_range) > 1 else []
else:
    print("No complete years found, using default year range")
    validate_years = []
    train_years = []

print(f"Training years: {train_years}")
print(f"Validation years: {validate_years}")

# Define key fields for correlation analysis
key_fields = [
    'RunningFleetTotal',   
    'ReceivedCount',         
    'ShippedCount',          
    'InRepair',              
    'RawResets',            
    'Failures',              
    'ContractualMTBF',       
    'SeatResets',            
    'TotalPassengers'        
]

# Filter to only fields that exist in our dataframe
key_fields = [field for field in key_fields if field in mtbf_sequence_df.columns]

# Filter to recent years for correlation analysis
print("Filtering data for correlation analysis...")
year_filter = (mtbf_sequence_df['Year'] >= 2019) & (mtbf_sequence_df['Year'] <= 2025)
training_data_sample = mtbf_sequence_df[year_filter].sample(
    min(100000, len(mtbf_sequence_df[year_filter])), 
    random_state=42
)
print(f"Using {len(training_data_sample)} rows for correlation analysis")

# Clean data for correlation
print("Preparing data for correlation...")
corr_df = training_data_sample[key_fields].copy()

# Convert to numeric and handle NAs
for col in key_fields:
    # First check the column type and any potential issues
    print(f"Column {col}: dtype={corr_df[col].dtype}, na_count={corr_df[col].isna().sum()}")
    # Convert to numeric
    corr_df[col] = pd.to_numeric(corr_df[col], errors='coerce')
    # Replace NaN values with 0 for correlation calculation
    corr_df[col] = corr_df[col].fillna(0)

# Calculate correlation
try:
    key_field_corr = corr_df.corr()
    print("\nCorrelation matrix for key fields:")
    print(key_field_corr)
    
    for field in key_fields:
        print(f"\nTop correlations with {field}:")
        field_corr = key_field_corr[field].sort_values(ascending=False)
        print(field_corr.drop(field).head(5))
except Exception as e:
    print(f"Error calculating correlation: {e}")    
    # More aggressive cleaning approach
    for col in key_fields:
        # Keep only columns that can be converted to float
        try:
            corr_df[col] = corr_df[col].astype(float)
        except:
            print(f"Dropping bad column: {col}")
            corr_df = corr_df.drop(columns=[col])
    
    # Try correlation again with remaining columns
    if len(corr_df.columns) >= 2:
        key_field_corr = corr_df.corr()
        print("\nCorrelation matrix after cleaning:")
        print(key_field_corr)
    else:
        print("Not enough valid columns for correlation analysis")

# Check for data period coverage by field
print("\nData period coverage by field:")
for field in key_fields:
    if field in mtbf_sequence_df.columns:
        # Sample data to avoid memory issues
        sample_size = min(1000000, len(mtbf_sequence_df))
        sampled_df = mtbf_sequence_df.sample(sample_size)
        
        # Get first and last non-null date for each field
        field_data = sampled_df[sampled_df[field].notna()]
        if len(field_data) > 0:
            first_date = field_data['WeekStart'].min()
            last_date = field_data['WeekStart'].max()
            print(f"{field}: {len(field_data)} values in sample from {first_date} to {last_date}")
        else:
            print(f"{field}: No valid data in sample")
        
        # Clean up
        del sampled_df
        del field_data
        gc.collect()

# Check for variance in fields
print("\nVariance by field:")
for field in key_fields:
    try:
        # Sample data to avoid memory issues
        sample_size = min(1000000, len(mtbf_sequence_df))
        field_sample = mtbf_sequence_df[field].dropna().sample(sample_size)
        
        if len(field_sample) > 0:
            field_sample = pd.to_numeric(field_sample, errors='coerce')
            var = field_sample.var()
            print(f"{field} variance (from sample): {var}")
        else:
            print(f"{field}: No valid data for variance calculation")
    except Exception as e:
        print(f"{field} variance calculation error: {e}")

# Check data types
print("\nData types by field:")
for field in key_fields:
    if field in mtbf_sequence_df.columns:
        dtype = mtbf_sequence_df[field].dtype
        print(f"{field}: {dtype}")

# Save the final sequence data
print("Saving final sequence data...")
output_path = os.path.join(processed_data_dir, 'weekly_sequence_5_final.parquet')

# Save in chunks if the data is large
rows_per_chunk = 1000000
num_save_chunks = (len(mtbf_sequence_df) + rows_per_chunk - 1) // rows_per_chunk

if num_save_chunks <= 1:
    # Small enough to save as one file
    mtbf_sequence_df.to_parquet(output_path)
    print("Data saved!")
else:
    # Save in multiple chunks
    print(f"Saving data in {num_save_chunks} chunks...")
    temp_paths = []
    
    for i in range(num_save_chunks):
        start_idx = i * rows_per_chunk
        end_idx = min((i + 1) * rows_per_chunk, len(mtbf_sequence_df))
        
        chunk = mtbf_sequence_df.iloc[start_idx:end_idx]
        temp_path = os.path.join(processed_data_dir, f'weekly_sequence_5_final_temp_{i}.parquet')
        chunk.to_parquet(temp_path)
        temp_paths.append(temp_path)
        
        del chunk
        gc.collect()
        
        print(f"Saved chunk {i+1}/{num_save_chunks}")
    
    print("All chunks saved!")

    # Combine chunks into one file if needed
    combined_df = pd.concat([pd.read_parquet(path) for path in temp_paths], ignore_index=True)
    combined_df.to_parquet(output_path)
    print(f"Combined data saved to {output_path}")
    # Clean up temporary files
    for path in temp_paths:
        os.remove(path)
        print(f"Removed temporary file: {path}")
    print("Temporary files removed")
    

Loading weekly sequence data...
Weekly sequence loaded
Date range: 2012-03-26 00:00:00 to 2058-11-25 00:00:00
Number of unique parts: 326
Calculating yearly completeness...

Yearly completeness by field:
      ReceivedCount  ShippedCount  InRepair  RawResets  SeatResets  \
2012         90.94%        90.94%    90.94%      0.00%       0.00%   
2013        100.00%       100.00%   100.00%      0.00%       0.00%   
2014        100.00%       100.00%   100.00%      0.00%       0.00%   
2015        100.00%       100.00%   100.00%      0.00%       0.00%   
2016        100.00%       100.00%   100.00%      0.00%       0.00%   
2017        100.00%       100.00%   100.00%      0.00%       0.00%   
2018        100.00%       100.00%   100.00%      0.00%       0.98%   
2019        100.00%       100.00%   100.00%      0.01%       1.26%   
2020        100.00%       100.00%   100.00%      0.12%       0.13%   
2021        100.00%       100.00%   100.00%      0.07%       0.07%   
2022        100.00%       