In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import pandas as pd
import glob
import os
import numpy as np
from tqdm import tqdm

def resample_station_data(df: pd.DataFrame, station_id_col: str, timestamp_col: str, freq: str = '10min') -> pd.DataFrame:
    """
    Resamples irregular time-series data using a robust and efficient pd.merge_asof method.
    """
    df = df.sort_values(timestamp_col)
    all_resampled_dfs = []
    for station_id, station_df in tqdm(df.groupby(station_id_col), desc="Resampling stations"):
        if station_df.empty:
            continue
        start_time = station_df[timestamp_col].min().floor(freq)
        end_time = station_df[timestamp_col].max().ceil(freq)
        time_grid = pd.DataFrame({timestamp_col: pd.date_range(start=start_time, end=end_time, freq=freq)})
        resampled_station = pd.merge_asof(
            left=time_grid,
            right=station_df,
            on=timestamp_col,
            direction='nearest',
            tolerance=pd.Timedelta('15min')
        )
        resampled_station[station_id_col] = station_id
        resampled_station = resampled_station.ffill().bfill()
        resampled_station.dropna(subset=[c for c in resampled_station.columns if c not in [timestamp_col]], inplace=True)
        all_resampled_dfs.append(resampled_station)
    if not all_resampled_dfs:
        return pd.DataFrame()
    final_df = pd.concat(all_resampled_dfs, ignore_index=True)
    return final_df

# --- Configuration ---
DATA_DIR = '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data/'
OUTPUT_DIR = '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/'
BATCH_SIZE = 50
os.makedirs(OUTPUT_DIR, exist_ok=True)

# --- 1. Process and Clean Site Information (Dimension Table) ---
print("--- Step 1: Consolidating and Cleaning Site Information ---")
site_files = glob.glob(os.path.join(DATA_DIR, '*_site.csv'))

if not site_files:
    print(f"Error: No site files found in '{DATA_DIR}'.")
    sites_info_df = None
else:
    all_sites_df = pd.concat((pd.read_csv(file) for file in site_files), ignore_index=True)
    print(f"Loaded {len(all_sites_df)} records from {len(site_files)} site files.")
    cols_to_drop_from_sites = ['sarea', 'ar']
    all_sites_df = all_sites_df.drop(columns=[col for col in cols_to_drop_from_sites if col in all_sites_df.columns])
    sites_info_df = all_sites_df.sort_values('sno').drop_duplicates(subset='sno', keep='last').copy()
    print(f"Created a clean lookup table with {len(sites_info_df)} unique stations.")

# --- 2. Process Snapshot Data in Batches to Conserve Memory ---
print("\n--- Step 2: Processing Snapshot Data in Batches ---")
slot_files = sorted(glob.glob(os.path.join(DATA_DIR, '*_slot.csv')))
processed_batch_files = []

if not slot_files or sites_info_df is None:
    print(f"Error: No snapshot/slot files found or site info is missing. Halting.")
else:
    num_batches = int(np.ceil(len(slot_files) / BATCH_SIZE))
    for i in range(num_batches):
        start_index = i * BATCH_SIZE
        end_index = start_index + BATCH_SIZE
        batch_files = slot_files[start_index:end_index]

        print(f"\n--- Processing Batch {i+1}/{num_batches} ---")

        batch_df = pd.concat((pd.read_csv(file) for file in batch_files), ignore_index=True)
        print(f"Loaded {len(batch_df)} records from {len(batch_files)} files.")

        timestamp_col = 'infoTime'
        numeric_cols = ['total', 'available_rent_bikes', 'available_return_bikes']

        batch_df[timestamp_col] = pd.to_datetime(batch_df[timestamp_col], errors='coerce')
        batch_df.dropna(subset=[timestamp_col], inplace=True)

        for col in numeric_cols:
            if col in batch_df.columns:
                batch_df[col] = pd.to_numeric(batch_df[col], errors='coerce')

        batch_df.dropna(subset=[c for c in numeric_cols if c in batch_df.columns], inplace=True)

        resampled_batch_df = resample_station_data(batch_df, station_id_col='sno', timestamp_col=timestamp_col)

        # --- THE FIX IS HERE ---
        # Include 'sareaen' as it's a critical clustering/categorical feature.
        site_info_to_merge = sites_info_df[['sno', 'sna', 'latitude', 'longitude', 'sareaen']].copy()

        # Rename for consistency before merging
        site_info_to_merge = site_info_to_merge.rename(columns={'latitude': 'lat', 'longitude': 'lng'})

        final_batch_df = pd.merge(resampled_batch_df, site_info_to_merge, on='sno', how='left')

        batch_output_path = os.path.join(OUTPUT_DIR, f'temp_batch_{i+1}.csv')
        final_batch_df.to_csv(batch_output_path, index=False)
        processed_batch_files.append(batch_output_path)
        print(f"Processed batch saved to '{batch_output_path}'")

# --- 3. Consolidate Processed Batches into Final Master File (Memory Efficiently) ---
# This part remains the same and will correctly handle the new column.
print("\n--- Step 3: Consolidating all Processed Batches ---")
if processed_batch_files:
    output_path = os.path.join(OUTPUT_DIR, 'consolidated_youbike_data_processed.csv')

    first_batch_df = pd.read_csv(processed_batch_files[0])
    first_batch_df = first_batch_df.rename(columns={'infoTime': 'mday'})
    first_batch_df['mday'] = pd.to_datetime(first_batch_df['mday'])
    first_batch_df.to_csv(output_path, index=False, header=True)

    if len(processed_batch_files) > 1:
        for file in tqdm(processed_batch_files[1:], desc="Appending remaining batches"):
            batch_df = pd.read_csv(file)
            batch_df = batch_df.rename(columns={'infoTime': 'mday'})
            batch_df.to_csv(output_path, mode='a', index=False, header=False)

    print("\nConsolidation of all batches is complete.")
    print("\nPreview of the final consolidated DataFrame (first 5 rows):")
    print(pd.read_csv(output_path, nrows=5))
    print(f"\nMaster dataset has been saved to: '{output_path}'")

    for file in processed_batch_files:
        os.remove(file)
    print("Temporary batch files have been removed.")
else:
    print("No batches were processed. Final file not created.")



--- Step 1: Consolidating and Cleaning Site Information ---
Loaded 391601 records from 261 site files.
Created a clean lookup table with 1613 unique stations.

--- Step 2: Processing Snapshot Data in Batches ---

--- Processing Batch 1/9 ---
Loaded 5248387 records from 50 files.


Resampling stations: 100%|██████████| 1424/1424 [00:07<00:00, 200.78it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_1.csv'

--- Processing Batch 2/9 ---
Loaded 4886354 records from 50 files.


Resampling stations: 100%|██████████| 1439/1439 [00:08<00:00, 169.93it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_2.csv'

--- Processing Batch 3/9 ---
Loaded 5580967 records from 50 files.


Resampling stations: 100%|██████████| 1461/1461 [00:11<00:00, 131.37it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_3.csv'

--- Processing Batch 4/9 ---
Loaded 3899791 records from 50 files.


Resampling stations: 100%|██████████| 1498/1498 [00:09<00:00, 155.13it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_4.csv'

--- Processing Batch 5/9 ---
Loaded 5562547 records from 50 files.


Resampling stations: 100%|██████████| 1499/1499 [00:13<00:00, 113.21it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_5.csv'

--- Processing Batch 6/9 ---
Loaded 5502489 records from 50 files.


Resampling stations: 100%|██████████| 1520/1520 [00:16<00:00, 90.53it/s] 


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_6.csv'

--- Processing Batch 7/9 ---
Loaded 5015749 records from 50 files.


Resampling stations: 100%|██████████| 1561/1561 [00:10<00:00, 149.66it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_7.csv'

--- Processing Batch 8/9 ---
Loaded 3973600 records from 50 files.


Resampling stations: 100%|██████████| 1584/1584 [00:12<00:00, 122.57it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_8.csv'

--- Processing Batch 9/9 ---
Loaded 1467244 records from 16 files.


Resampling stations: 100%|██████████| 1593/1593 [00:05<00:00, 285.91it/s]


Processed batch saved to '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/temp_batch_9.csv'

--- Step 3: Consolidating all Processed Batches ---


Appending remaining batches: 100%|██████████| 8/8 [19:16<00:00, 144.52s/it]



Consolidation of all batches is complete.

Preview of the final consolidated DataFrame (first 5 rows):
                  mday        sno  total  available_rent_bikes  \
0  2024-05-04 00:00:00  500101001   28.0                   6.0   
1  2024-05-04 00:10:00  500101001   28.0                   6.0   
2  2024-05-04 00:20:00  500101001   28.0                   3.0   
3  2024-05-04 00:30:00  500101001   28.0                   0.0   
4  2024-05-04 00:40:00  500101001   28.0                   1.0   

   available_return_bikes                 sna       lat       lng     sareaen  
0                    22.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  Daan Dist.  
1                    22.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  Daan Dist.  
2                    25.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  Daan Dist.  
3                    28.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  Daan Dist.  
4                    27.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  Daan Dist.  

Master dataset has

In [None]:
!pip install --upgrade translators

Collecting translators
  Downloading translators-6.0.1-py3-none-any.whl.metadata (70 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/70.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.6/70.6 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Collecting niquests>=3.14.0 (from translators)
  Downloading niquests-3.15.2-py3-none-any.whl.metadata (16 kB)
Collecting exejs>=0.0.4 (from translators)
  Downloading exejs-0.0.6-py3-none-any.whl.metadata (5.1 kB)
Collecting pathos>=0.3.4 (from translators)
  Downloading pathos-0.3.4-py3-none-any.whl.metadata (11 kB)
Collecting cloudscraper>=1.2.71 (from translators)
  Downloading cloudscraper-1.2.71-py2.py3-none-any.whl.metadata (19 kB)
Collecting urllib3-future<3,>=2.13.903 (from niquests>=3.14.0->translators)
  Downloading urllib3_future-2.13.906-py3-none-any.whl.metadata (15 kB)
Collecting wassima<3,>=1.0.1 (from niquests>=3.14.0->translators)
  Downloading wassim

In [1]:
import pandas as pd
import glob
import os
from tqdm import tqdm

# --- Configuration ---
# Ensure this path matches the output directory from your previous script
OUTPUT_DIR = '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/'

# --- Step 3: Consolidate Processed Batches into Final Master File (Memory Efficiently) ---
print("--- Step 3: Consolidating all Processed Batches ---")

# Find all the temporary batch files created by the previous step
processed_batch_files = sorted(glob.glob(os.path.join(OUTPUT_DIR, 'temp_batch_*.csv')))

if processed_batch_files:
    output_path = os.path.join(OUTPUT_DIR, 'consolidated_youbike_data_processed.csv')

    # --- Process and write the first batch with a header ---
    print("Processing first batch to create final file with header...")
    first_batch_df = pd.read_csv(processed_batch_files[0])
    first_batch_df = first_batch_df.rename(columns={'infoTime': 'mday'})
    first_batch_df['mday'] = pd.to_datetime(first_batch_df['mday'])
    first_batch_df.to_csv(output_path, index=False, header=True)

    # --- Append the remaining batches without a header ---
    if len(processed_batch_files) > 1:
        for file in tqdm(processed_batch_files[1:], desc="Appending remaining batches"):
            batch_df = pd.read_csv(file)
            batch_df = batch_df.rename(columns={'infoTime': 'mday'})
            # No need to convert mday to datetime here, as it's just being written to CSV
            batch_df.to_csv(output_path, mode='a', index=False, header=False)

    print("\nConsolidation of all batches is complete.")

    print("\nPreview of the final consolidated DataFrame (first 5 rows):")
    # Read just the start of the file for a quick preview
    print(pd.read_csv(output_path, nrows=5))

    print(f"\nMaster dataset has been saved to: '{output_path}'")
    print("NOTE: The final file is not globally sorted to prevent memory crashes.")

    # Clean up temporary batch files
    for file in processed_batch_files:
        os.remove(file)
    print("Temporary batch files have been removed.")
else:
    print("No processed batch files (temp_batch_*.csv) were found. Final file not created.")


--- Step 3: Consolidating all Processed Batches ---
Processing first batch to create final file with header...


Appending remaining batches: 100%|██████████| 8/8 [14:59<00:00, 112.43s/it]


Consolidation of all batches is complete.

Preview of the final consolidated DataFrame (first 5 rows):
                  mday        sno  total  available_rent_bikes  \
0  2024-05-04 00:00:00  500101001   28.0                   6.0   
1  2024-05-04 00:10:00  500101001   28.0                   6.0   
2  2024-05-04 00:20:00  500101001   28.0                   3.0   
3  2024-05-04 00:30:00  500101001   28.0                   0.0   
4  2024-05-04 00:40:00  500101001   28.0                   1.0   

   available_return_bikes                 sna       lat       lng  
0                    22.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  
1                    22.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  
2                    25.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  
3                    28.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  
4                    27.0  YouBike2.0_捷運科技大樓站  25.02605  121.5436  

Master dataset has been saved to: '/content/drive/MyDrive/Youbike_Master_Project/YouBike_D




In [1]:
import pandas as pd
import glob
import os
from tqdm import tqdm

# --- Configuration ---
RAW_DATA_DIR = '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data/'
CLEAN_DATA_DIR = '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/'
PROCESSED_FILE_PATH = os.path.join(CLEAN_DATA_DIR, 'consolidated_youbike_data_processed.csv')
CHUNK_SIZE = 1_000_000 # Process 1 million rows at a time

print("--- Starting Memory-Efficient Data Validation ---")

# --- 1. File Existence Check ---
print(f"\n[1/7] Checking for final processed file: '{PROCESSED_FILE_PATH}'...")
if not os.path.exists(PROCESSED_FILE_PATH):
    print("... FAIL: Processed file not found. Please run the consolidation script first.")
    exit()
print("... PASS: File exists.")

# --- 2. Verify Station Completeness against Raw Data ---
print(f"\n[2/7] Verifying Station Completeness (this step loads raw site files)...")
try:
    site_files = glob.glob(os.path.join(RAW_DATA_DIR, '*_site.csv'))
    if not site_files:
        raise FileNotFoundError("No raw site files found.")

    raw_sites_df = pd.concat((pd.read_csv(file, usecols=['sno']) for file in site_files), ignore_index=True)
    unique_stations_raw = set(raw_sites_df['sno'].unique())
    print(f"      Found {len(unique_stations_raw):,} unique stations in raw site files.")

    # We will build the processed stations set chunk by chunk
    unique_stations_processed = set()
    for chunk in tqdm(pd.read_csv(PROCESSED_FILE_PATH, usecols=['sno'], chunksize=CHUNK_SIZE), desc="      Scanning for stations"):
        unique_stations_processed.update(chunk['sno'].unique())

    missing_stations = unique_stations_raw - unique_stations_processed

    if not missing_stations:
        print("... PASS: All stations from raw data are present in the final processed file.")
    else:
        print(f"... FAIL: {len(missing_stations)} stations from the raw data are missing in the final file.")
        print(f"      Missing station IDs (sno): {list(missing_stations)}")

except Exception as e:
    print(f"... FAIL: An error occurred: {e}")

# --- Initialize variables for chunk-based validation ---
nan_report = pd.Series(dtype=int)
merge_check_report = pd.Series(dtype=int)
total_incorrect_intervals = 0
last_row_of_chunk = None

# --- Perform Chunk-Based Validations (Checks 3, 4, 5, 6, 7) ---
print("\n[3-7] Performing chunk-based validation on the processed file...")
reader = pd.read_csv(PROCESSED_FILE_PATH, chunksize=CHUNK_SIZE, parse_dates=['mday'])

for i, chunk in tqdm(enumerate(reader), desc="Validating chunks"):
    # First chunk checks
    if i == 0:
        # --- 6. Verify Column Schema ---
        print("\n[6/7] Verifying final column schema (on first chunk)...")
        final_cols = set(chunk.columns)
        expected_cols = {'sno', 'mday', 'total', 'available_rent_bikes', 'available_return_bikes', 'sna', 'lat', 'lng', 'sareaen'}
        missing_expected = expected_cols - final_cols
        if not missing_expected:
            print("... PASS: Final column schema is correct.")
        else:
            print(f"... FAIL: Missing expected columns: {missing_expected}")

        # --- 7. Check Data Types ---
        print("\n[7/7] Verifying column data types (on first chunk)...")
        print("      Data types of final DataFrame:")
        print(chunk.dtypes)

    # --- 4. Check for Missing Values (NaNs) ---
    nan_report = nan_report.add(chunk.isnull().sum(), fill_value=0)

    # --- 5. Validate Successful Merging ---
    merge_check_cols = ['sna', 'lat', 'lng', 'sareaen']
    merge_check_report = merge_check_report.add(chunk[merge_check_cols].isnull().sum(), fill_value=0)

    # --- 3. Check for Correct 10-Minute Resampling Interval ---
    chunk = chunk.sort_values(by=['sno', 'mday'])

    # Check intervals *within* the chunk
    chunk['time_diff'] = chunk.groupby('sno')['mday'].diff()
    incorrect_in_chunk = chunk[chunk['time_diff'].notna() & (chunk['time_diff'] != pd.Timedelta('10 minutes'))]
    total_incorrect_intervals += len(incorrect_in_chunk)

    # Check interval *between* the last chunk and this one
    if last_row_of_chunk is not None:
        first_row_of_chunk = chunk.iloc[0]
        if last_row_of_chunk['sno'] == first_row_of_chunk['sno']:
            between_chunk_diff = first_row_of_chunk['mday'] - last_row_of_chunk['mday']
            if between_chunk_diff != pd.Timedelta('10 minutes'):
                total_incorrect_intervals += 1

    last_row_of_chunk = chunk.iloc[-1]

# --- Final Reports for Chunk-Based Checks ---
print("\n--- Final Validation Reports ---")

# Report for Check 4
print("\n[4/7] Final Report: Missing Values (NaNs)...")
nan_in_key_cols = nan_report[nan_report > 0]
if nan_in_key_cols.empty:
    print("... PASS: No missing values found in any column across the entire dataset.")
else:
    print("... FAIL: Missing values were found in the following columns:")
    print(nan_in_key_cols)

# Report for Check 5
print("\n[5/7] Final Report: Merge Success...")
if merge_check_report.sum() == 0:
    print("... PASS: Station name, coordinates, and district were successfully merged for all records.")
else:
    print("... FAIL: Some records are missing site information, indicating a merge issue.")
    print(merge_check_report[merge_check_report > 0])

# Report for Check 3
print("\n[3/7] Final Report: Resampling Interval...")
if total_incorrect_intervals == 0:
    print("... PASS: All time intervals between records for each station are exactly 10 minutes.")
else:
    print(f"... FAIL: Found {total_incorrect_intervals} records with incorrect time intervals.")

print("\n--- Validation Complete ---")



--- Starting Memory-Efficient Data Validation ---

[1/7] Checking for final processed file: '/content/drive/MyDrive/Youbike_Master_Project/YouBike_Demand_Forecast/data_clean/consolidated_youbike_data_processed.csv'...
... PASS: File exists.

[2/7] Verifying Station Completeness (this step loads raw site files)...
      Found 1,613 unique stations in raw site files.


      Scanning for stations: 89it [01:13,  1.21it/s]


... PASS: All stations from raw data are present in the final processed file.

[3-7] Performing chunk-based validation on the processed file...


Validating chunks: 0it [00:00, ?it/s]


[6/7] Verifying final column schema (on first chunk)...
... PASS: Final column schema is correct.

[7/7] Verifying column data types (on first chunk)...
      Data types of final DataFrame:
mday                      datetime64[ns]
sno                                int64
total                            float64
available_rent_bikes             float64
available_return_bikes           float64
sna                               object
lat                              float64
lng                              float64
sareaen                           object
dtype: object


Validating chunks: 89it [02:17,  1.54s/it]


--- Final Validation Reports ---

[4/7] Final Report: Missing Values (NaNs)...
... PASS: No missing values found in any column across the entire dataset.

[5/7] Final Report: Merge Success...
... PASS: Station name, coordinates, and district were successfully merged for all records.

[3/7] Final Report: Resampling Interval...
... PASS: All time intervals between records for each station are exactly 10 minutes.

--- Validation Complete ---



