In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from datetime import datetime
import os
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:


def decompose_timestamp(df):
    """Extract temporal components from timestamp"""
    df['datetime'] = pd.to_datetime(df['timestamp'])
    
    df['year'] = df['datetime'].dt.year
    df['month'] = df['datetime'].dt.month
    df['day'] = df['datetime'].dt.day
    df['weekday'] = df['datetime'].dt.weekday  # Monday=0, Sunday=6
    df['hour'] = df['datetime'].dt.hour
    df['minute'] = df['datetime'].dt.minute

    df['is_weekend'] = df['weekday'].apply(lambda x: 1 if x >= 5 else 0)
    df['part_of_day'] = df['hour'].apply(lambda x: 
                                        'morning' if 6 <= x < 12 else
                                        'afternoon' if 12 <= x < 17 else
                                        'evening' if 17 <= x < 20 else
                                        'night')
    
    df['season'] = df['month'].apply(lambda x:
                                    'winter' if x in [12, 1, 2] else
                                    'spring' if x in [3, 4, 5] else
                                    'summer' if x in [6, 7, 8] else
                                    'fall')
    
    # Cyclical encoding
    df['month_sin'] = np.sin(2 * np.pi * df['month']/12)
    df['month_cos'] = np.cos(2 * np.pi * df['month']/12)
    df['hour_sin'] = np.sin(2 * np.pi * df['hour']/24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour']/24)
    df['weekday_sin'] = np.sin(2 * np.pi * df['weekday']/7)
    df['weekday_cos'] = np.cos(2 * np.pi * df['weekday']/7)
    df['minute_sin'] = np.sin(2 * np.pi * df['minute']/7)
    df['minute_cos'] = np.cos(2 * np.pi * df['minute']/7)
    
    return df


def process_boolean_features(df):
    """Convert boolean features to integers"""
    bool_cols = ['closed', 'is_german_holiday', 'is_swiss_holiday', 'is_french_holiday']
    
    for col in bool_cols:
        if col in df.columns:
            if df[col].dtype == bool:
                df[col] = df[col].astype(int)
            elif df[col].dtype == object:
                df[col] = df[col].map({'True': 1, 'False': 0})
    
    return df



def preprocess_theme_park_data_memory_efficient(df, output_file='processed_data.parquet', batch_size=100000, temp_dir='temp_efficient'):
    """
    Memory-efficient implementation that uses a temporary directory approach
    but processes and writes each batch only once.
    """

    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir)
    
    for f in os.listdir(temp_dir):
        if f.endswith('.parquet'):
            os.remove(os.path.join(temp_dir, f))
    
    total_rows = len(df)
    print(f"Total rows to process: {total_rows}")
    
    # First, fit encoders and scalers on a sample to get feature names
    sample_size = min(100000, len(df))
    sample_df = df.sample(sample_size, random_state=42)
    
    sample_df = decompose_timestamp(sample_df)
    sample_df = process_boolean_features(sample_df)
    
    cat_cols = ['ride_name','part_of_day', 'season']
    encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
    encoder.fit(sample_df[cat_cols])
    

    num_cols = [
        'temperature', 'rain', 'wind', 'year'
    ]
    
    
    num_cols = [col for col in num_cols if col in df.columns]
    
    scaler = StandardScaler()
    scaler.fit(sample_df[num_cols])
    
    batch_files = []
    
    for start_idx in range(0, len(df), batch_size):

        end_idx = min(start_idx + batch_size, len(df))
        batch = df.iloc[start_idx:end_idx].copy()
        
        batch_num = (start_idx // batch_size) + 1
        print(f"Processing batch {batch_num}: rows {start_idx} to {end_idx}")
        
        batch = decompose_timestamp(batch)
        
        # we drop those because they're now stored as cyclical encoded features
        batch = batch.drop(columns=['month', 'day', 'hour', 'minute'], errors='ignore')
        batch = process_boolean_features(batch)
        
        encoded_cats = encoder.transform(batch[cat_cols])
        encoded_df = pd.DataFrame(
            encoded_cats,
            columns=encoder.get_feature_names_out(cat_cols),
            index=batch.index
        )
        
        batch[num_cols] = scaler.transform(batch[num_cols])
        
        batch = pd.concat([batch.drop(cat_cols, axis=1), encoded_df], axis=1)
        
        cols_to_drop = ['timestamp', 'datetime']
        batch = batch.drop(columns=[col for col in cols_to_drop if col in batch.columns])
        
        temp_file = os.path.join(temp_dir, f"batch_{batch_num}.parquet")
        batch.to_parquet(temp_file, index=False)
        batch_files.append(temp_file)
        
        # Release memory
        del batch
        del encoded_df
        
        progress = (end_idx / total_rows) * 100
        print(f"Progress: {progress:.2f}%")
    
    print(f"All batches processed. Creating final output file...")
    
    # Create the final output file
    if os.path.exists(output_file):
        os.remove(output_file)
    
    dfs = []
    
    # Process in smaller groups to avoid memory issues
    concat_batch_size = 5 
    for i in range(0, len(batch_files), concat_batch_size):
        batch_group = batch_files[i:i+concat_batch_size]
        print(f"Combining batch files {i+1} to {min(i+concat_batch_size, len(batch_files))}")
        
        group_dfs = [pd.read_parquet(file) for file in batch_group]
        combined_df = pd.concat(group_dfs, ignore_index=True)
        
        mode = 'w' if i == 0 else 'a'
        combined_df.to_parquet(output_file, index=False, engine='fastparquet', append=(mode=='a'))
        
        # Clean up
        for df_obj in group_dfs:
            del df_obj
        del combined_df
    
    print(f"All data combined and saved to {output_file}")
    
    for file in batch_files:
        os.remove(file)
    
    print("Temporary files removed")
    
    transformers = {'encoder': encoder, 'scaler': scaler}
    return transformers

In [3]:
data_input_dir = "../data/processed"
input_file = os.path.join(data_input_dir, "ep", "merged_with_ride_features.parquet")
ep_df = pd.read_parquet(input_file)
print(ep_df.columns.unique())
ep_df.drop(columns=['feature_attraction_type', 'feature_category', 'feature_max_height', 'feature_track_length', 'feature_max_speed', 'feature_g_force',
       'feature_min_age', 'feature_min_height', 'feature_capacity_per_hour'], errors='ignore', inplace=True)

Index(['ride_name', 'timestamp', 'wait_time', 'closed', 'temperature', 'rain',
       'wind', 'is_german_holiday', 'is_swiss_holiday', 'is_french_holiday',
       'feature_attraction_type', 'feature_category', 'feature_max_height',
       'feature_track_length', 'feature_max_speed', 'feature_g_force',
       'feature_min_age', 'feature_min_height', 'feature_capacity_per_hour'],
      dtype='object')


In [4]:
transformers = preprocess_theme_park_data_memory_efficient(ep_df, 'processed_data.parquet', batch_size=1000000)


Total rows to process: 14196903
Processing batch 1: rows 0 to 1000000
Progress: 7.04%
Processing batch 2: rows 1000000 to 2000000
Progress: 14.09%
Processing batch 3: rows 2000000 to 3000000
Progress: 21.13%
Processing batch 4: rows 3000000 to 4000000
Progress: 28.18%
Processing batch 5: rows 4000000 to 5000000
Progress: 35.22%
Processing batch 6: rows 5000000 to 6000000
Progress: 42.26%
Processing batch 7: rows 6000000 to 7000000
Progress: 49.31%
Processing batch 8: rows 7000000 to 8000000
Progress: 56.35%
Processing batch 9: rows 8000000 to 9000000
Progress: 63.39%
Processing batch 10: rows 9000000 to 10000000
Progress: 70.44%
Processing batch 11: rows 10000000 to 11000000
Progress: 77.48%
Processing batch 12: rows 11000000 to 12000000
Progress: 84.53%
Processing batch 13: rows 12000000 to 13000000
Progress: 91.57%
Processing batch 14: rows 13000000 to 14000000
Progress: 98.61%
Processing batch 15: rows 14000000 to 14196903
Progress: 100.00%
All batches processed. Creating final outp

In [5]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile("processed_data.parquet")
all_columns = parquet_file.schema.names

columns_to_read = [col for col in all_columns 
                   if not (col.startswith("feature_attraction_type") or col.startswith("feature_category") or col.startswith("feature"))]

table = pq.read_table("processed_data.parquet")
ep_df_preview = table.slice(0, 1000000).to_pandas()


In [6]:
len(ep_df_preview.columns)

69

In [6]:
ep_df_preview.columns

Index(['wait_time', 'closed', 'temperature', 'rain', 'wind',
       'is_german_holiday', 'is_swiss_holiday', 'is_french_holiday', 'year',
       'weekday', 'is_weekend', 'month_sin', 'month_cos', 'hour_sin',
       'hour_cos', 'weekday_sin', 'weekday_cos', 'minute_sin', 'minute_cos',
       'ride_name_alpine express enzian',
       'ride_name_arena of football  be part of it', 'ride_name_arthur',
       'ride_name_atlantica supersplash', 'ride_name_atlantis adventure',
       'ride_name_baaa express', 'ride_name_bellevue ferris wheel',
       'ride_name_blue fire megacoaster', 'ride_name_castello dei medici',
       'ride_name_dancing dingie', 'ride_name_euromir',
       'ride_name_eurosat  cancan coaster', 'ride_name_eurotower',
       'ride_name_fjordrafting',
       'ride_name_jim button  journey through morrowland',
       'ride_name_josefinas magical imperial journey',
       'ride_name_kolumbusjolle', 'ride_name_madame freudenreich curiosits',
       'ride_name_matterhornblitz', 

In [7]:
ep_df_preview

Unnamed: 0,wait_time,closed,temperature,rain,wind,is_german_holiday,is_swiss_holiday,is_french_holiday,year,weekday,...,ride_name_voltron nevera powered by rimac,ride_name_whale adventures northern lights,part_of_day_afternoon,part_of_day_evening,part_of_day_morning,part_of_day_night,season_fall,season_spring,season_summer,season_winter
0,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0
1,0.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0
2,0.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0
3,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0
4,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,0.0,1,,,,0,0,0,2017,2,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0
999996,0.0,1,,,,0,0,0,2017,2,...,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0
999997,0.0,1,,,,0,0,0,2017,2,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0
999998,0.0,1,,,,0,0,0,2017,2,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0


In [8]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile("processed_data.parquet")
all_columns = parquet_file.schema.names

columns_to_read = [col for col in all_columns 
                   if not (col.startswith("ride_name") or col.startswith("season")or col.startswith("part"))]

table = pq.read_table("processed_data.parquet", columns=columns_to_read)
ep_df_column_analyze = table.slice(0, 1000000).to_pandas()


In [9]:
ep_df_column_analyze

Unnamed: 0,wait_time,closed,temperature,rain,wind,is_german_holiday,is_swiss_holiday,is_french_holiday,year,weekday,is_weekend,month_sin,month_cos,hour_sin,hour_cos,weekday_sin,weekday_cos,minute_sin,minute_cos
0,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,0,5.000000e-01,-0.866025,0.707107,-0.707107,0.781831,0.623490,0.000000,1.000000
1,0.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,0,5.000000e-01,-0.866025,0.707107,-0.707107,0.781831,0.623490,0.000000,1.000000
2,0.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,0,5.000000e-01,-0.866025,0.707107,-0.707107,0.781831,0.623490,0.000000,1.000000
3,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,0,5.000000e-01,-0.866025,0.707107,-0.707107,0.781831,0.623490,0.000000,1.000000
4,1.0,0,0.132163,-0.303297,-0.991343,0,0,0,2017,1,0,5.000000e-01,-0.866025,0.707107,-0.707107,0.781831,0.623490,0.000000,1.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,0.0,1,,,,0,0,0,2017,2,0,-2.449294e-16,1.000000,0.707107,-0.707107,0.974928,-0.222521,0.974928,-0.222521
999996,0.0,1,,,,0,0,0,2017,2,0,-2.449294e-16,1.000000,0.707107,-0.707107,0.974928,-0.222521,0.974928,-0.222521
999997,0.0,1,,,,0,0,0,2017,2,0,-2.449294e-16,1.000000,0.707107,-0.707107,0.974928,-0.222521,0.974928,-0.222521
999998,0.0,1,,,,0,0,0,2017,2,0,-2.449294e-16,1.000000,0.707107,-0.707107,0.974928,-0.222521,0.974928,-0.222521


In [25]:
print(sorted(ep_df_column_analyze["month_sin"].unique()))
print(sorted(ep_df_column_analyze["month_cos"].unique()))

[np.float64(-1.0), np.float64(-0.8660254037844386), np.float64(-0.8660254037844384), np.float64(-0.5000000000000004), np.float64(-0.4999999999999997), np.float64(-2.4492935982947064e-16), np.float64(1.2246467991473532e-16), np.float64(0.49999999999999994)]
[np.float64(-1.0), np.float64(-0.8660254037844388), np.float64(-0.8660254037844387), np.float64(-0.5000000000000004), np.float64(-1.8369701987210297e-16), np.float64(0.5000000000000001), np.float64(0.8660254037844384), np.float64(1.0)]


In [13]:
ep_df_column_analyze.columns

Index(['wait_time', 'closed', 'temperature', 'rain', 'wind',
       'is_german_holiday', 'is_swiss_holiday', 'is_french_holiday', 'year',
       'month', 'day', 'weekday', 'hour', 'minute', 'is_weekend', 'month_sin',
       'month_cos', 'hour_sin', 'hour_cos', 'weekday_sin', 'weekday_cos',
       'part_of_day_afternoon', 'part_of_day_evening', 'part_of_day_morning',
       'part_of_day_night', 'season_fall', 'season_spring', 'season_summer',
       'season_winter'],
      dtype='object')

In [19]:
ep_df_column_analyze["minute"].unique()

array([ 0,  5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55], dtype=int32)