In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import os
import glob
from geopy.distance import geodesic # Used for Haversine distance calculation

print("Required libraries imported.")

Required libraries imported.


In [11]:
%conda install tabulate

Channels:
 - defaults
 - conda-forge
Platform: win-64
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

## Package Plan ##

  environment location: c:\Users\sanac\anaconda3\envs\big_data

  added / updated specs:
    - tabulate


The following NEW packages will be INSTALLED:

  tabulate           pkgs/main/win-64::tabulate-0.9.0-py312haa95532_0 



Downloading and Extracting Packages: ...working... done
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.


In [None]:
# --- Configuration ---

data_directory = "C:\\Users\\sanac\\Downloads\\Big Data\\T-drive Taxi Trajectories\\release\\taxi_log_2008_by_id"

# Output directory for the PREPROCESSED files (ready for Kafka)
# These files will contain calculated speed, distance, and outlier flags.
preprocessed_output_dir = "C:\\Users\\sanac\\Downloads\\Big Data\\T-drive Taxi Trajectories\\release\\taxi_preprocessed_for_kafka"

# Ensure output directory exists
os.makedirs(preprocessed_output_dir, exist_ok=True)

print(f"Original data directory: {data_directory}")
print(f"Preprocessed data will be saved to: {preprocessed_output_dir}")
print("Output directory ensured.")

# --- Quick path verification ---
if os.path.exists(data_directory):
    print(f"\nVerification: data_directory '{data_directory}' exists.")
    test_files = glob.glob(os.path.join(data_directory, "*.txt"))
    print(f"Verification: Found {len(test_files)} .txt files in source directory.")
    if len(test_files) > 0:
        print(f"Verification: Example source file: {test_files[0]}")
    else:
        print("WARNING: No .txt files found in the data_directory. Double-check your path and file extensions!")
else:
    print(f"ERROR: data_directory '{data_directory}' does NOT exist. Please correct the path.")

Original data directory: C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_log_2008_by_id
Preprocessed data will be saved to: C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_preprocessed_for_kafka
Output directory ensured.

Verification: data_directory 'C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_log_2008_by_id' exists.
Verification: Found 10357 .txt files in source directory.
Verification: Example source file: C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_log_2008_by_id\1.txt


In [None]:
# Preprocessing Function

def preprocess_taxi_data_for_kafka(file_path):
    """
    Loads, cleans, and enriches a single taxi's trajectory data for Kafka replay.
    Calculates time difference, distance, and speed. Flags potential outliers.
    """
    try:
        # 1. Load Data
        # Data structure: taxiId, timestamp, longitude, latitude
        df = pd.read_csv(file_path, header=None, names=['taxi_id_raw', 'datetime_str', 'longitude', 'latitude'])

        if df.empty:
            return None, None # Return None if DataFrame is empty

        # Extract taxi_id from filename (more reliable for consistent ID)
        taxi_id = int(os.path.basename(file_path).replace('.txt', ''))
        df['taxi_id'] = taxi_id

        # 2. Parse Data Types
        # Convert datetime string to datetime objects
        df['datetime'] = pd.to_datetime(df['datetime_str'])

        # 3. Sort Data by Timestamp (Crucial for sequential processing)
        df = df.sort_values(by='datetime').reset_index(drop=True)

        # 4. Remove Duplicate Consecutive Points (identical timestamp, longitude, latitude)
        # These are redundant for real-time streaming and add no value.
        df = df.drop_duplicates(subset=['datetime', 'longitude', 'latitude'], keep='first').reset_index(drop=True)

        # 5. Calculate Time Difference (in minutes)
        # Needed for speed calculation
        df['prev_datetime'] = df['datetime'].shift(1)
        df['time_diff'] = (df['datetime'] - df['prev_datetime']).dt.total_seconds() / 60 # In minutes

        # 6. Calculate Distance (Haversine formula, in meters)
        # This requires the previous latitude/longitude
        df['prev_latitude'] = df['latitude'].shift(1)
        df['prev_longitude'] = df['longitude'].shift(1)

        # Function to apply Haversine distance
        # Only calculate if both current and previous points are available
        def calculate_haversine_distance(row):
            if pd.isna(row['prev_latitude']) or pd.isna(row['prev_longitude']):
                return np.nan
            coords_1 = (row['prev_latitude'], row['prev_longitude'])
            coords_2 = (row['latitude'], row['longitude'])
            # geodesic returns distance in meters by default if unit is not specified,
            # but Haversine is explicitly asked for. geopy.distance.haversine provides this.
            return geodesic(coords_1, coords_2).meters # Returns distance in meters

        df['distance'] = df.apply(calculate_haversine_distance, axis=1)

        # 7. Calculate Speed (meters/second and km/hour)
        # Speed = Distance / Time
        df['speed_mps'] = df['distance'] / (df['time_diff'] * 60) # Convert time_diff from minutes to seconds
        df['speed_kmph'] = df['speed_mps'] * 3.6 # Convert m/s to km/h

        # Handle cases where time_diff is zero or near zero to avoid division by zero for speed.
        # If time_diff is 0, speed is 0. If distance > 0 and time_diff <=0, it's an error.
        df.loc[df['time_diff'] <= 0, 'speed_mps'] = 0
        df.loc[df['time_diff'] <= 0, 'speed_kmph'] = 0


        # 8. Identify Potential Outliers (Flagging)
        # Based on common sense thresholds for urban taxi data
        MAX_TIME_DIFF_MINUTES = 24 * 60 # 24 hours (for very large gaps)
        MAX_SPEED_KMPH = 120 # Unrealistic speed for urban taxi
        MIN_SPEED_KMPH_FOR_MOVEMENT = 0.5 # Effectively stationary
        MAX_DISTANCE_METERS = 50000 # Teleportation error
        SPEED_LIMIT_FOR_WARNING = 50 # km/h as per project description

        df['is_outlier_time'] = (df['time_diff'] > MAX_TIME_DIFF_MINUTES) | (df['time_diff'] <= 0)
        df['is_outlier_speed'] = (df['speed_kmph'] > MAX_SPEED_KMPH) | \
                                 ((df['speed_kmph'] < MIN_SPEED_KMPH_FOR_MOVEMENT) & (df['distance'] > 50)) # Low speed, high distance indicates GPS drift
        df['is_outlier_distance'] = (df['distance'] > MAX_DISTANCE_METERS) # Teleportation

        # Combine outlier flags. These points are problematic.
        df['is_outlier'] = df['is_outlier_time'] | df['is_outlier_speed'] | df['is_outlier_distance']

        # Add a flag for the specific speed limit mentioned for warnings
        df['is_speeding'] = (df['speed_kmph'] > SPEED_LIMIT_FOR_WARNING) & (~df['is_outlier_speed']) # Only flag if not already an extreme outlier speed

        # Handle the very first point of each trajectory:
        # It has no previous point, so time_diff, distance, speed will be NaN.
        # These NaNs will be present for the first row of each taxi in the cleaned data.
        # For output, we can decide to keep them as NaN or fill. For Kafka, passing NaN is fine.

        # Select and reorder columns for clarity in the output file
        # We include datetime_str as well for easy Kafka payload creation
        output_df = df[[
            'taxi_id', 'datetime_str', 'longitude', 'latitude',
            'time_diff', 'distance', 'speed_mps', 'speed_kmph',
            'is_outlier_time', 'is_outlier_speed', 'is_outlier_distance', 'is_outlier', 'is_speeding'
        ]].copy()

        return output_df, None # We don't need a separate outliers_df as we're just flagging

    except Exception as e:
        print(f"Error processing {os.path.basename(file_path)}: {e}")
        return None, None

print("Preprocessing function 'preprocess_taxi_data_for_kafka' defined.")

Preprocessing function 'preprocess_taxi_data_for_kafka' defined.


In [None]:
# Main Execution Flow for Preprocessing

print("Starting data preprocessing for Kafka replay...")

# Get a list of all taxi data files
all_taxi_files = glob.glob(os.path.join(data_directory, "*.txt"))
if not all_taxi_files:
    print(f"ERROR: No .txt files found in {data_directory}. Please check the 'data_directory' path in Cell 2.")
    print("Workflow aborted.")
else:
    print(f"Found {len(all_taxi_files)} taxi files in {data_directory}.")

    num_files_to_process = len(all_taxi_files) # Process ALL files

    print(f"\n--- Preprocessing {num_files_to_process} files for Kafka replay ---")
    processed_count = 0
    skipped_count = 0

    for i, file_path in enumerate(all_taxi_files[:num_files_to_process]):
        if (i + 1) % 500 == 0 or (i + 1) == num_files_to_process:
            print(f"  Processing file {i+1}/{num_files_to_process}: {os.path.basename(file_path)}")

        preprocessed_df, _ = preprocess_taxi_data_for_kafka(file_path)

        if preprocessed_df is not None and not preprocessed_df.empty:
            # Save the preprocessed DataFrame
            output_file_name = os.path.join(preprocessed_output_dir, os.path.basename(file_path))
            preprocessed_df.to_csv(output_file_name, index=False) # Save without pandas index
            processed_count += 1
        else:
            print(f"  Skipped empty or problematic file: {os.path.basename(file_path)}")
            skipped_count += 1

    print(f"\nPreprocessing complete for {num_files_to_process} files.")
    print(f"Successfully preprocessed and saved {processed_count} files to {preprocessed_output_dir}.")
    print(f"Skipped {skipped_count} empty or problematic files.")

    print("\n--- Data Preprocessing Workflow Complete ---")
    print("The files in the 'taxi_preprocessed_for_kafka' directory are now ready to be used by your Kafka producer.")

Starting data preprocessing for Kafka replay...
Found 10357 taxi files in C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_log_2008_by_id.

--- Preprocessing 10357 files for Kafka replay ---
  Skipped empty or problematic file: 10115.txt
  Skipped empty or problematic file: 10352.txt
  Skipped empty or problematic file: 1089.txt
  Processing file 500/10357: 1125.txt
  Skipped empty or problematic file: 1497.txt
  Processing file 1000/10357: 1576.txt
  Skipped empty or problematic file: 1947.txt
  Processing file 1500/10357: 2025.txt
  Processing file 2000/10357: 2476.txt
  Processing file 2500/10357: 2926.txt
  Skipped empty or problematic file: 2929.txt
  Skipped empty or problematic file: 2945.txt
  Skipped empty or problematic file: 295.txt
  Skipped empty or problematic file: 3050.txt
  Skipped empty or problematic file: 3160.txt
  Skipped empty or problematic file: 3194.txt
  Processing file 3000/10357: 3376.txt
  Processing file 3500/10357: 3826.txt
  Skip

  return cls(*args)


Error processing 5850.txt: Latitude must be in the [-90; 90] range.
  Skipped empty or problematic file: 5850.txt
  Skipped empty or problematic file: 5972.txt
  Skipped empty or problematic file: 6030.txt
  Processing file 6000/10357: 6076.txt
  Skipped empty or problematic file: 6236.txt
  Skipped empty or problematic file: 6322.txt


  return cls(*args)


Error processing 6340.txt: Latitude must be in the [-90; 90] range.
  Skipped empty or problematic file: 6340.txt
  Processing file 6500/10357: 6526.txt
  Skipped empty or problematic file: 6717.txt
  Processing file 7000/10357: 6977.txt
  Processing file 7500/10357: 7426.txt
  Skipped empty or problematic file: 7583.txt


  df['datetime'] = pd.to_datetime(df['datetime_str'])


  Processing file 8000/10357: 7877.txt
  Skipped empty or problematic file: 8209.txt
  Processing file 8500/10357: 8326.txt
  Skipped empty or problematic file: 8424.txt
  Processing file 9000/10357: 8777.txt
  Processing file 9500/10357: 9226.txt
  Processing file 10000/10357: 9677.txt
  Skipped empty or problematic file: 9874.txt


  return cls(*args)


Error processing 9949.txt: Latitude must be in the [-90; 90] range.
  Skipped empty or problematic file: 9949.txt
  Processing file 10357/10357: 9999.txt

Preprocessing complete for 10357 files.
Successfully preprocessed and saved 10333 files to C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_preprocessed_for_kafka.
Skipped 24 empty or problematic files.

--- Data Preprocessing Workflow Complete ---
The files in the 'taxi_preprocessed_for_kafka' directory are now ready to be used by your Kafka producer.


### Data Preprocessing for Real-time Analytics

Our preprocessing transforms raw taxi trajectory files into a clean, enriched format suitable for streaming into Kafka and processing by Apache Flink. This is essential because the raw T-Drive data is unstructured, noisy, and lacks the derived features needed for real-time analysis.

For each individual taxi's trajectory:

We begin by **loading the raw GPS points** (taxi ID, timestamp, longitude, latitude) into a Pandas DataFrame. Extracting the `taxi_id` from the filename ensures consistent and reliable identification for each taxi, providing a solid foundation for further steps.

Next, we **parse the timestamp strings** into proper datetime objects. This is crucial because mathematical operations and chronological sorting cannot be performed on text, enabling accurate time difference calculations vital for understanding movement.

We then **rigorously sort all GPS points chronologically**. This step is absolutely critical, as calculating features like distance and speed relies on the precise sequence of points. Without proper ordering, our insights in Flink would be nonsensical.

To streamline the data flow, we **remove identical consecutive GPS records** (same time, same location). These redundant points add no analytical value and unnecessarily inflate the data volume in our Kafka stream, making processing less efficient.

We then **calculate the time difference** (in minutes) between each consecutive GPS point. This `time_diff` is a core metric, essential for understanding how long a taxi took to move between locations, directly feeding into our speed calculations.

Following this, we **calculate the real-world distance** (in meters) between consecutive GPS points using the Haversine formula. This accounts for the Earth's curvature, providing accurate spatial measurements necessary for determining actual travel.

With time and distance in hand, we **compute the taxi's speed** in both meters per second and kilometers per hour. Logic is included to handle zero or negative time differences by setting speed to zero, preventing errors. Speed is a primary analytical insight for real-time monitoring and a crucial input for Flink's "Calculate speed" operator.

Finally, we **flag potential outliers** rather than removing them. This critical data quality step adds boolean indicators for problematic points (e.g., unusually large time gaps, unrealistic speeds over 120 km/h, impossible distances, or exceeding a 50 km/h speed limit). This is a deliberate design choice:

* **Preserving Context:** Flagging retains the original data, providing full context for downstream analysis.
* **Flink's Responsibility:** Our Flink application is designed to make the final decisions on how to handle these flagged points in real-time. It can choose to filter them for aggregations, trigger alerts, or include them based on specific business rules.
* **Flexibility & Debugging:** This approach allows us to adapt outlier definitions without re-processing raw data and simplifies debugging by enabling traceability of data quality issues.

By performing these steps, we transform raw GPS logs into a clean, feature-rich dataset, perfectly primed for Kafka streaming and robust real-time analytics with Flink.

In [8]:
all_preprocessed_files = glob.glob(os.path.join(preprocessed_output_dir, "*.txt"))

# List to store metrics for each taxi
taxi_metrics_list = []

print(f"Aggregating metrics from {len(all_preprocessed_files)} preprocessed files...")

for i, file_path in enumerate(all_preprocessed_files):
    if (i + 1) % 1000 == 0:
        print(f"  Aggregating file {i+1}/{len(all_preprocessed_files)}")

    try:
        df_preprocessed = pd.read_csv(file_path)
        if df_preprocessed.empty:
            continue

        taxi_id = df_preprocessed['taxi_id'].iloc[0] # Get taxi_id from the DataFrame

        # Calculate metrics for this taxi
        num_points = len(df_preprocessed)
        # For total_time_minutes, sum up the valid time_diffs
        # Drop NaN for start/end time and count of outliers
        total_time_minutes = df_preprocessed['time_diff'].sum()
        total_distance_meters = df_preprocessed['distance'].sum()

        # Handle potential division by zero if total_time_minutes is 0
        average_speed_kmph = (total_distance_meters / (total_time_minutes * 60) * 3.6) if total_time_minutes > 0 else 0

        # Calculate start and end time from actual datetime objects if possible, otherwise use strings
        start_time = df_preprocessed['datetime_str'].min() if not df_preprocessed['datetime_str'].empty else None
        end_time = df_preprocessed['datetime_str'].max() if not df_preprocessed['datetime_str'].empty else None

        num_outliers = df_preprocessed['is_outlier'].sum() # Count flagged outliers

        taxi_metrics_list.append({
            'taxi_id': taxi_id,
            'num_points': num_points,
            'total_time_minutes': total_time_minutes,
            'total_distance_meters': total_distance_meters,
            'average_speed_kmph': average_speed_kmph,
            'start_time': start_time,
            'end_time': end_time,
            'num_outliers': num_outliers
        })
    except Exception as e:
        print(f"Error aggregating {os.path.basename(file_path)}: {e}")

taxi_metrics_df = pd.DataFrame(taxi_metrics_list)
print("\ntaxi_metrics_df created successfully.")
print(taxi_metrics_df.head())

# Optional: Save the metrics DataFrame for future use
taxi_metrics_df.to_csv(os.path.join(preprocessed_output_dir, "taxi_metrics_summary.csv"), index=False)
print(f"Taxi metrics summary saved to {os.path.join(preprocessed_output_dir, 'taxi_metrics_summary.csv')}")

Aggregating metrics from 10333 preprocessed files...
  Aggregating file 1000/10333
  Aggregating file 2000/10333
  Aggregating file 3000/10333
  Aggregating file 4000/10333
  Aggregating file 5000/10333
  Aggregating file 6000/10333
  Aggregating file 7000/10333
  Aggregating file 8000/10333
  Aggregating file 9000/10333
  Aggregating file 10000/10333

taxi_metrics_df created successfully.
   taxi_id  num_points  total_time_minutes  total_distance_meters  \
0        1         564         8655.383333           4.414151e+05   
1       10        5398         8885.950000           1.181000e+06   
2      100        1272         8444.383333           1.353615e+06   
3     1000        1693         8875.250000           1.447631e+06   
4    10000        1558         8877.750000           6.958293e+05   

   average_speed_kmph           start_time             end_time  num_outliers  
0            3.059935  2008-02-02 15:36:08  2008-02-08 15:51:31            24  
1            7.974390  2008-02-0

### Purpose: Summarizing Preprocessed Taxi Data for Selection

This section of the code systematically aggregates key statistics from *each* preprocessed taxi trajectory file. The primary goal is to build a comprehensive summary table (`taxi_metrics_df`) that allows us to understand the quality and characteristics of every taxi's data. This summary is then used to intelligently select the "best" or "most representative" taxi IDs for our Kafka/Flink simulation, ensuring we work with high-quality, relevant data.

**Key Actions & Rationale:**

* **Iterate Through Preprocessed Files:** The code loops through every single preprocessed `.txt` file, loading each one into a DataFrame. This ensures we capture metrics for all taxis.
* **Calculate Per-Taxi Metrics:** For each taxi, it calculates essential summary statistics directly from the preprocessed data:
    * **Number of Points:** Indicates data density and duration of the recorded trajectory.
    * **Total Time & Distance:** Sums up the calculated `time_diff` and `distance` values to give a total duration and total distance traveled for the entire trip, crucial for understanding activity.
    * **Average Speed:** Computes the overall average speed for the taxi, providing a single metric to gauge its general movement behavior.
    * **Start/End Times:** Captures the span of the recorded journey.
    * **Outlier Count:** Sums the `is_outlier` flags to quantify the data quality, indicating how many potentially problematic points were found in that taxi's trajectory.
* **Store Metrics:** Each taxi's calculated metrics are stored as a row in a growing list, eventually forming the `taxi_metrics_df`.
* **Generate Final Summary Table:** Converts the collected metrics into a structured Pandas DataFrame. This format is ideal for filtering, sorting, and analyzing the overall characteristics of your taxi fleet's data.
* **Persist Summary to CSV:** The `taxi_metrics_df` is saved to `taxi_metrics_summary.csv`. This is vital for reproducibility; it means you don't need to re-run the lengthy aggregation process every time you want to review taxi characteristics or refine your selection criteria. It acts as a persistent snapshot of your dataset's metadata.

In [4]:
#Select "Best" Taxi IDs 

taxi_metrics_summary_path = os.path.join(preprocessed_output_dir, "taxi_metrics_summary.csv")

try:
    taxi_metrics_df = pd.read_csv(taxi_metrics_summary_path)
    print(f"Loaded taxi_metrics_df from: {taxi_metrics_summary_path}")
except FileNotFoundError:
    print(f"Error: taxi_metrics_summary.csv not found at {taxi_metrics_summary_path}.")
    print("Please ensure you have run the preprocessing and metrics aggregation step to generate this file.")
    # In a real script, you might sys.exit() here
    raise # Or raise an error to stop execution if in a script

print("\nOriginal taxi_metrics_df head:")
print(taxi_metrics_df.head())
print(f"Total taxis in summary: {len(taxi_metrics_df)}")

# --- Define Selection Criteria ---
MIN_POINTS = 5000
MIN_AVG_SPEED_KMPH = 10
MAX_AVG_SPEED_KMPH = 40
MAX_OUTLIER_PROPORTION = 0.02
MIN_TOTAL_DISTANCE_KM = 50
MIN_TOTAL_TIME_HOURS = 10

# --- Apply Filters ---
print("\nApplying selection filters...")
filtered_taxis_df = taxi_metrics_df[
    (taxi_metrics_df['num_points'] >= MIN_POINTS) &
    (taxi_metrics_df['average_speed_kmph'] >= MIN_AVG_SPEED_KMPH) &
    (taxi_metrics_df['average_speed_kmph'] <= MAX_AVG_SPEED_KMPH) &
    (taxi_metrics_df['num_outliers'] / taxi_metrics_df['num_points'] <= MAX_OUTLIER_PROPORTION) &
    (taxi_metrics_df['total_distance_meters'] / 1000 >= MIN_TOTAL_DISTANCE_KM) &
    (taxi_metrics_df['total_time_minutes'] / 60 >= MIN_TOTAL_TIME_HOURS)
].copy()

filtered_taxis_df = filtered_taxis_df.sort_values(by='total_distance_meters', ascending=False)

print(f"\nFiltered taxis meeting all criteria: {len(filtered_taxis_df)} out of {len(taxi_metrics_df)} original taxis.")
print("\nSelected Taxis (top 10 based on criteria and sorted by total distance):")
print(filtered_taxis_df.head(10).to_markdown(index=False))

# Populate selected_taxi_ids for the next block
selected_taxi_ids = filtered_taxis_df['taxi_id'].tolist()

Loaded taxi_metrics_df from: C:\Users\sanac\Downloads\Big Data\T-drive Taxi Trajectories\release\taxi_preprocessed_for_kafka\taxi_metrics_summary.csv

Original taxi_metrics_df head:
   taxi_id  num_points  total_time_minutes  total_distance_meters  \
0        1         564         8655.383333           4.414151e+05   
1       10        5398         8885.950000           1.181000e+06   
2      100        1272         8444.383333           1.353615e+06   
3     1000        1693         8875.250000           1.447631e+06   
4    10000        1558         8877.750000           6.958293e+05   

   average_speed_kmph           start_time             end_time  num_outliers  
0            3.059935  2008-02-02 15:36:08  2008-02-08 15:51:31            24  
1            7.974390  2008-02-02 13:32:03  2008-02-08 17:38:00            62  
2            9.617858  2008-02-02 18:44:59  2008-02-08 15:29:22             7  
3            9.786523  2008-02-02 13:34:52  2008-02-08 17:30:07            12  
4  

### Rationale for Taxi Selection Criteria

The specific values chosen for filtering taxis are based on a combination of data exploration and project goals:

* **`MIN_POINTS = 5000` (Number of GPS Points):** Ensures selected taxis have a **sufficiently long and detailed trajectory**, representing substantial activity, not just short, incomplete trips.
* **`MIN_AVG_SPEED_KMPH = 10` & `MAX_AVG_SPEED_KMPH = 40` (Average Speed):**
    * **10 km/h (Min):** Filters out taxis that were predominantly **stationary or had negligible average movement**, indicating idle time or noisy data over the entire period. Focuses on active taxis.
    * **40 km/h (Max):** Excludes taxis with **unrealistically high average speeds for urban operations**, which might signify data errors or non-representative highway travel not relevant to urban taxi patterns.
* **`MAX_OUTLIER_PROPORTION = 0.02` (Outlier Proportion):** Limits chosen taxis to those with **minimal data quality issues** (e.g., extreme speeds, time gaps, distances). A 2% threshold helps ensure the selected data is cleaner and more reliable for streaming.
* **`MIN_TOTAL_DISTANCE_KM = 50` (Total Distance):** Selects taxis that have **covered a meaningful distance**, confirming they were actively used for travel rather than just static logging.
* **`MIN_TOTAL_TIME_HOURS = 10` (Total Time):** Ensures taxis have **recorded data over a significant duration**, providing rich, continuous activity patterns suitable for real-time simulation.

These criteria collectively define a "high-quality" and "representative" taxi trajectory subset, allowing us to focus our Kafka/Flink simulation on data that best reflects typical and reliable urban taxi operations.

In [8]:
# --- Further limit the number of selected taxis if needed for your environment ---
MAX_SELECTED_TAXI_FILES_FOR_YOUR_RUN = 10 # Example: only use the top 10 best taxis for your Flink testing

if len(selected_taxi_ids) > MAX_SELECTED_TAXI_FILES_FOR_YOUR_RUN:
    final_selected_taxi_ids = selected_taxi_ids[:MAX_SELECTED_TAXI_FILES_FOR_YOUR_RUN]
    print(f"\nFurther limiting the final selection to top {MAX_SELECTED_TAXI_FILES_FOR_YOUR_RUN} taxis.")
else:
    final_selected_taxi_ids = selected_taxi_ids
    print(f"\nUsing all {len(final_selected_taxi_ids)} selected taxis (no further limit applied).")

print(f"\nFinal count of taxi IDs to use: {len(final_selected_taxi_ids)}")

print("\n--- Details for Final Selected Taxis ---")
# Filter the original filtered_taxis_df (which is already sorted and clean)
# to only include the final_selected_taxi_ids.
# Using isin() is efficient for matching multiple IDs.
final_selected_taxis_details_df = filtered_taxis_df[
    filtered_taxis_df['taxi_id'].isin(final_selected_taxi_ids)
].copy()

# Print the full details in a nice Markdown table format
print(final_selected_taxis_details_df.to_markdown(index=False))


# Save the list of selected taxi IDs to a file. Your Kafka producer will read this.
# Ensure preprocessed_output_dir is accessible/defined if running in a new session or script
with open(os.path.join(preprocessed_output_dir, "selected_taxi_ids.txt"), "w") as f:
    for taxi_id in final_selected_taxi_ids:
        f.write(f"{taxi_id}\n")

print(f"List of selected taxi IDs saved to {os.path.join(preprocessed_output_dir, 'selected_taxi_ids.txt')}")

print("\n--- Taxi ID Selection Workflow Complete ---")
print("You now have a 'selected_taxi_ids.txt' file that your Kafka producer can use.")


Further limiting the final selection to top 10 taxis.

Final count of taxi IDs to use: 10

--- Details for Final Selected Taxis ---
|   taxi_id |   num_points |   total_time_minutes |   total_distance_meters |   average_speed_kmph | start_time          | end_time            |   num_outliers |
|----------:|-------------:|---------------------:|------------------------:|---------------------:|:--------------------|:--------------------|---------------:|
|      4867 |         9324 |              8888    |             5.88452e+06 |              39.7245 | 2008-02-02 13:31:10 | 2008-02-08 17:39:10 |            144 |
|      5414 |         6837 |              8767.8  |             5.80351e+06 |              39.7147 | 2008-02-02 13:32:01 | 2008-02-08 15:39:49 |             17 |
|      4401 |         9380 |              8886.93 |             5.71104e+06 |              38.558  | 2008-02-02 13:31:44 | 2008-02-08 17:38:40 |             51 |
|      1971 |         5806 |              8700.13 |      

In [None]:
# --- ADDED CODE BELOW ---
print("\n--- Details for Final Selected Taxis ---")

final_selected_taxis_details_df = filtered_taxis_df[
    filtered_taxis_df['taxi_id'].isin(final_selected_taxi_ids)
].copy()

# --- Apply number formatting for display purposes ---
# Create a dictionary of formatters for specific columns
formatters = {
    'total_distance_meters': '{:,.2f}'.format,  # Format as float with 2 decimal places and comma separator
    'total_time_minutes': '{:,.2f}'.format,    # Same for total_time_minutes
    'average_speed_kmph': '{:,.2f}'.format     # Same for average_speed_kmph
    # You can add more formatters for other columns if needed
}

# Create a display copy to apply formatting without altering the original DataFrame
display_df = final_selected_taxis_details_df.copy()

# Apply the formatting directly to the columns for display
display_df['total_distance_meters'] = display_df['total_distance_meters'].apply(lambda x: f'{x:,.2f}')
display_df['total_time_minutes'] = display_df['total_time_minutes'].apply(lambda x: f'{x:,.2f}')
display_df['average_speed_kmph'] = display_df['average_speed_kmph'].apply(lambda x: f'{x:,.2f}')


print(display_df.to_markdown(index=False))



--- Details for Final Selected Taxis ---
|   taxi_id |   num_points | total_time_minutes   | total_distance_meters   |   average_speed_kmph | start_time          | end_time            |   num_outliers |
|----------:|-------------:|:---------------------|:------------------------|---------------------:|:--------------------|:--------------------|---------------:|
|      4867 |         9324 | 8,888.00             | 5,884,518.43            |                39.72 | 2008-02-02 13:31:10 | 2008-02-08 17:39:10 |            144 |
|      5414 |         6837 | 8,767.80             | 5,803,508.12            |                39.71 | 2008-02-02 13:32:01 | 2008-02-08 15:39:49 |             17 |
|      4401 |         9380 | 8,886.93             | 5,711,040.58            |                38.56 | 2008-02-02 13:31:44 | 2008-02-08 17:38:40 |             51 |
|      1971 |         5806 | 8,700.13             | 5,562,400.83            |                38.36 | 2008-02-02 13:31:12 | 2008-02-08 14:31:20 |    