In [22]:
# Now, from above, integrating partial scanning again!

import sys
import os
import concurrent.futures
sys.path.append(os.path.abspath('../..'))
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from utlis.sync_utlis.sync_df_utlis import find_calib_file
from utlis.scan_engine_utlis.scan_engine_utlis import (
    read_failed_paths,
    match_date_pattern,
    assign_status_codes
)
from status_fields_config import STATUS_FIELDS_CONFIG


# Function to scan an individual folder (for parallel processing)
def scan_folder(folder_name, base_folder, failed_paths, config):
    folder_path = os.path.join(base_folder, folder_name)
    rec_files_data = []  # To store rec files and their status
    calib_files = []  # To store calibration files

    # Check for calibration files starting with 'calib'
    for file_name in os.listdir(folder_path):
        if file_name.startswith("calib"):
            calib_files.append(file_name)

    # Traverse subfolders within this folder
    for subfolder_name in os.listdir(folder_path):
        subfolder_path = os.path.join(folder_path, subfolder_name)

        # Check for subfolders starting with a digit (rec folders)
        if os.path.isdir(subfolder_path) and subfolder_name[0].isdigit():
            # Find calibration file for each subfolder
            calib_file = find_calib_file(subfolder_path)

            # Assign status codes dynamically based on the config
            rec_file_data = assign_status_codes(
                folder_name, subfolder_path, calib_file, failed_paths, config
            )

            rec_file_data['rec_file'] = subfolder_name  # Add rec_file to the data
            rec_files_data.append(rec_file_data)

    return {
        'date_folder': folder_name,
        'calib_files': calib_files,  # Store the calibration files under date_folder level
        'rec_files_data': rec_files_data  # Each rec file with its status fields
    }

# def log_folder_to_parquet(base_folder, parquet_file, failed_paths_file, config):
#     # Read manually inputted failed paths
#     failed_paths = read_failed_paths(failed_paths_file)

#     # Initialize logged folders
#     logged_folders = []

#     # Check if the Parquet file exists
#     if os.path.exists(parquet_file):
#         # Read the existing log to get already processed date folders
#         existing_df = pq.read_table(parquet_file).to_pandas()
#         logged_folders = existing_df['date_folder'].unique() if not existing_df.empty else []
#     else:
#         print("No existing Parquet file found. Running full scan.")

#     # Get the list of current date folders that match the date pattern
#     date_folders = [
#         f for f in os.listdir(base_folder) 
#         if os.path.isdir(os.path.join(base_folder, f)) and match_date_pattern(f)
#     ]
    
#     # Filter for new folders not logged yet
#     new_folders = [f for f in date_folders if f not in logged_folders]

#     # If no new folders are found, print a message and return
#     if not new_folders:
#         print("No new folders to scan.")
#         return

#     # Use ThreadPoolExecutor for parallel folder scanning
#     with concurrent.futures.ThreadPoolExecutor() as executor:
#         # Run scan_folder in parallel for each new date folder
#         log_data = list(executor.map(scan_folder, new_folders, [base_folder] * len(new_folders), [failed_paths] * len(new_folders), [config] * len(new_folders)))

#     # Convert the results into a DataFrame
#     df = pd.json_normalize(log_data, 'rec_files_data', ['date_folder', 'calib_files'])

#     # Dynamically ensure all relevant columns are strings based on config
#     status_columns = list(config.keys())
#     df[status_columns] = df[status_columns].astype(str)

#     # Create pyarrow Table and save as Parquet
#     table = pa.Table.from_pandas(df)
#     pq.write_table(table, parquet_file)
#     print("all scannning done")

def log_folder_to_parquet_sep(base_folder, failed_paths_file, config):
    """Log folders and save Parquet in subfolders with partial scan support."""
    
    # Read manually inputted failed paths
    failed_paths = read_failed_paths(failed_paths_file) if failed_paths_file else set()

    # Get the list of current date folders that match the date pattern
    date_folders = [
        f for f in os.listdir(base_folder) 
        if os.path.isdir(os.path.join(base_folder, f)) and match_date_pattern(f)
    ]

    # Use ThreadPoolExecutor for parallel folder scanning
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Run scan_folder in parallel for each new date folder
        log_data = list(executor.map(scan_folder, date_folders, [base_folder] * len(date_folders), [failed_paths] * len(date_folders), [config] * len(date_folders)))

    # Process and save each experiment's log separately
    for folder_log in log_data:
        date_folder = folder_log['date_folder']
        calib_files = folder_log.get('calib_files', [])

        # For each rec_file (experiment), check if Parquet exists, if not, save
        for rec_file_data in folder_log['rec_files_data']:
            rec_file = rec_file_data['rec_file']
            subfolder_save_path = os.path.join(base_folder, date_folder, rec_file, "folder_log.parquet")
            
            # Check if the Parquet file already exists
            if os.path.exists(subfolder_save_path):
                print(f"Skipping {subfolder_save_path}, Parquet file already exists.")
                continue  # Skip if the Parquet file already exists

            # Ensure the experiment/rec_file folder exists
            if not os.path.exists(os.path.dirname(subfolder_save_path)):
                os.makedirs(os.path.dirname(subfolder_save_path))
            
            # Add 'date_folder' and 'calib_files' to rec_file_data
            rec_file_data['date_folder'] = date_folder
            rec_file_data['calib_files'] = calib_files

            # Convert the data into a DataFrame and save the Parquet file
            df = pd.DataFrame([rec_file_data])
            table = pa.Table.from_pandas(df)
            pq.write_table(table, subfolder_save_path)
            
            print(f"Log for {rec_file} saved at {subfolder_save_path}")




if __name__ == "__main__":
    base_folder = "/home/lq53/mir_data/24summ"  # Replace with your base folder
    # save_path = os.path.join(base_folder, 'paret')
    failed_paths_file = "" #'/hpc/group/tdunn/Bryan_Rigs/BigOpenField/24summ/mir_bundle_run/synced_folders/240914_failed_sum_test.txt'  # File containing failed paths

    # if not os.path.exists(save_path):
    #     os.makedirs(save_path)

    # parquet_file = os.path.join(save_path, "folder_log_encoded_numb_paralle_test_3.parquet")  # Output Parquet file

    # Run the full scan with parallel processing
    # log_folder_to_parquet(base_folder, parquet_file, failed_paths_file, STATUS_FIELDS_CONFIG)

    log_folder_to_parquet_sep(base_folder, failed_paths_file, STATUS_FIELDS_CONFIG)


Skipping /home/lq53/mir_data/24summ/2024_08_26/20240730_PMCr2/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_08_26/20240717_PMCr2/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_08_26/20240717_PMCr1/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_06_26/1686940_left/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_07_15/1691485RMHBN1425/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_07_15/1691485RMPBF1531/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_07_15/1691485RMPBS1659/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_07_16/1691485RMHBN1405/folder_log.parquet, Parquet file already exists.
Skipping /home/lq53/mir_data/24summ/2024_08_16/20240717_PMC_r1_11_50/folder_log.parquet, Parquet file alre

In [21]:
import pyarrow.dataset as ds

def read_all_parquet_files(base_folder):
    """
    Efficiently read all Parquet files from the date folder structure using PyArrow's Dataset,
    and return a combined DataFrame.
    """
    # Create a dataset from the base folder; PyArrow will automatically recurse into subfolders
    dataset = ds.dataset(base_folder, format="parquet", exclude_invalid_files=True)
    
    # Convert the dataset to a PyArrow table and then to a Pandas DataFrame
    table = dataset.to_table()
    
    # Convert to Pandas DataFrame for further manipulation
    df = table.to_pandas()

    return df

# Example usage
# base_folder = "/path/to/your/base_folder"  # Replace with your base folder
combined_df = read_all_parquet_files(base_folder)

# Display or use the combined DataFrame
print(combined_df)

    mir_generate_param  sync  z_adjusted                             rec_file
0                    1     0           0                         1686940_left
1                    1     0           0                 1686941_left_right_2
2                    1     0           0       1691486_left_right_habituation
3                    1     0           0       1691485_left_hole_saline_10_35
4                    1     0           0    1691485_no_hole_habituation_13_59
5                    1     0           0                     1691485BMCFF1505
6                    1     0           0                     1691485BMCFS1547
7                    1     0           0          240605PMC1_right_hole_11_27
8                    1     0           0                     1691485RMHBN1425
9                    1     0           0                     1691485RMPBF1531
10                   1     0           0                     1691485RMPBS1659
11                   1     0           0                     169

In [14]:
import os
import pandas as pd
import pyarrow.parquet as pq

def read_all_parquet_files(base_folder):
    """Read all Parquet files from the date folder structure and return a combined DataFrame."""
    all_data = []  # List to store data from all Parquet files

    # Loop through each date folder
    for date_folder in os.listdir(base_folder):
        date_folder_path = os.path.join(base_folder, date_folder)
        
        # Check if it's a directory (date folder)
        if os.path.isdir(date_folder_path):
            # Loop through each subfolder (rec_file/experiment)
            for rec_file_folder in os.listdir(date_folder_path):
                rec_file_folder_path = os.path.join(date_folder_path, rec_file_folder)
                
                # Check if it's a directory (rec_file folder)
                if os.path.isdir(rec_file_folder_path):
                    # Look for Parquet file inside the rec_file folder
                    parquet_file = os.path.join(rec_file_folder_path, "folder_log.parquet")
                    
                    if os.path.exists(parquet_file):
                        # Read the Parquet file into a DataFrame
                        table = pq.read_table(parquet_file)
                        df = table.to_pandas()
                        
                        # Add the folder information (optional, for better tracking)
                        df['date_folder'] = date_folder
                        df['rec_file_folder'] = rec_file_folder
                        
                        # Append to the list of data
                        all_data.append(df)

    # Concatenate all the DataFrames into one
    if all_data:
        combined_df = pd.concat(all_data, ignore_index=True)
        return combined_df
    else:
        print("No Parquet files found.")
        return pd.DataFrame()  # Return empty DataFrame if no files found

# Example usage
# base_folder = "/path/to/your/base_folder"  # Replace with your base folder
combined_df = read_all_parquet_files(base_folder)

# Display or use the combined DataFrame
print(combined_df)


    mir_generate_param  sync  z_adjusted                             rec_file  \
0                    0     0           0                       20240730_PMCr2   
1                    0     0           0                       20240717_PMCr2   
2                    0     0           0                       20240717_PMCr1   
3                    1     0           0                         1686940_left   
4                    1     0           0                     1691485RMHBN1425   
5                    1     0           0                     1691485RMPBF1531   
6                    1     0           0                     1691485RMPBS1659   
7                    1     0           0                     1691485RMHBN1405   
8                    0     0           0                20240717_PMC_r1_11_50   
9                    0     0           0                20240717_PMC_r2_11_00   
10                   1     0           0                20240628_PMC_r1_11_43   
11                   1     0

In [25]:
# # # deleted some prior parque, okay, should not be okay... should not use this again i guess unless want to get rid of all parques..

# # import os

# # def delete_parquet_files_in_date_folders(base_folder):
# #     """Delete Parquet files saved directly under date folders."""
# #     # Loop through each date folder
# #     for date_folder in os.listdir(base_folder):
# #         date_folder_path = os.path.join(base_folder, date_folder)
        
# #         # Check if it's a directory (date folder)
# #         if os.path.isdir(date_folder_path):
# #             # Find and delete all .parquet files directly inside this folder
# #             for file_name in os.listdir(date_folder_path):
# #                 if file_name.endswith(".parquet"):
# #                     file_path = os.path.join(date_folder_path, file_name)
# #                     os.remove(file_path)
# #                     print(f"Deleted: {file_path}")

# # # Usage
# # base_folder = "/home/lq53/mir_data/24summ"  # Replace with your base folder
# # delete_parquet_files_in_date_folders(base_folder)
# import os

# def delete_all_parquet_files(base_folder):
#     """Delete all Parquet files within the base folder and its subfolders."""
#     # Walk through the entire directory tree
#     for dirpath, dirnames, filenames in os.walk(base_folder):
#         # Loop through each file in the current directory
#         for file_name in filenames:
#             if file_name.endswith(".parquet"):
#                 file_path = os.path.join(dirpath, file_name)
#                 os.remove(file_path)
#                 print(f"Deleted: {file_path}")

# # Usage
# base_folder = "/home/lq53/mir_data/24summ"  # Replace with your base folder
# delete_all_parquet_files(base_folder)


Deleted: /home/lq53/mir_data/24summ/2024_08_26/20240730_PMCr2/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_26/20240717_PMCr2/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_26/20240717_PMCr1/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_06_26/1686940_left/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_07_15/1691485RMHBN1425/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_07_15/1691485RMPBF1531/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_07_15/1691485RMPBS1659/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_07_16/1691485RMHBN1405/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_16/20240717_PMC_r1_11_50/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_16/20240717_PMC_r2_11_00/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_08/20240628_PMC_r1_11_43/folder_log.parquet
Deleted: /home/lq53/mir_data/24summ/2024_08_08/20240702_PMC_r1_12_02/folder_log.pa