In [None]:
!pip install supabase

Collecting supabase
  Downloading supabase-2.15.1-py3-none-any.whl.metadata (11 kB)
Collecting gotrue<3.0.0,>=2.11.0 (from supabase)
  Downloading gotrue-2.12.0-py3-none-any.whl.metadata (6.1 kB)
Collecting postgrest<1.1,>0.19 (from supabase)
  Downloading postgrest-1.0.1-py3-none-any.whl.metadata (3.5 kB)
Collecting realtime<2.5.0,>=2.4.0 (from supabase)
  Downloading realtime-2.4.3-py3-none-any.whl.metadata (6.7 kB)
Collecting storage3<0.12,>=0.10 (from supabase)
  Downloading storage3-0.11.3-py3-none-any.whl.metadata (1.8 kB)
Collecting supafunc<0.10,>=0.9 (from supabase)
  Downloading supafunc-0.9.4-py3-none-any.whl.metadata (1.2 kB)
Collecting pytest-mock<4.0.0,>=3.14.0 (from gotrue<3.0.0,>=2.11.0->supabase)
  Downloading pytest_mock-3.14.0-py3-none-any.whl.metadata (3.8 kB)
Collecting deprecation<3.0.0,>=2.1.0 (from postgrest<1.1,>0.19->supabase)
  Downloading deprecation-2.1.0-py2.py3-none-any.whl.metadata (4.6 kB)
Collecting aiohttp<4.0.0,>=3.11.18 (from realtime<2.5.0,>=2.4.0-

# Multithreaded Supabase Data Processing

This script demonstrates how to optimize the execution of analytical functions using multithreading.
It first fetches all data from Supabase in a single operation, then uses multithreading to process
the data concurrently using Pandas.

## I. Setup and Supabase Connection

First, we'll import all necessary libraries and initialize our Supabase client.

In [None]:
# Import required libraries
import time
import pandas as pd
import numpy as np
import psutil
import concurrent.futures
from supabase import create_client, Client
from typing import Dict, Any, List, Optional

In [None]:
# Initialize Supabase client
SUPABASE_URL = "https://ugjwigpcopmtjgylopwf.supabase.co"
SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InVnandpZ3Bjb3BtdGpneWxvcHdmIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NDU4MjgxMjIsImV4cCI6MjA2MTQwNDEyMn0.oFcP1wCt1upByqTU8NgD4FpJUdv9I8sG1ECWMX1wz8I"

In [None]:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

## II. Global Data Fetching and Processing Functions

In [None]:
def fetch_all_data_from_supabase(client: Client) -> Optional[pd.DataFrame]:
    """
    Fetches all data from the cars_clean table in Supabase using pagination.

    Args:
        client: The Supabase client instance

    Returns:
        A pandas DataFrame containing all fetched data, or None if an error occurs
    """
    try:
        # Parameters for pagination
        page_size = 1000
        start_range = 0

        # List to store all fetched rows
        all_data = []
        cumulative_row_count = 0

        # Continue fetching until no more data is returned
        while True:
            # Fetch a batch of data using range pagination
            response = client.table("cars_clean") \
                .select("*") \
                .range(start_range, start_range + page_size - 1) \
                .execute()

            # Get the current batch of data
            batch_data = response.data

            # If no data is returned, we've reached the end
            if not batch_data:
                break

            # Add the batch to our collected data
            all_data.extend(batch_data)

            # Update the count and display progress
            cumulative_row_count += len(batch_data)
            print(f"Fetched {cumulative_row_count} rows so far...")

            # Move to the next page
            start_range += page_size

        # Create a DataFrame from all collected data
        df = pd.DataFrame(all_data)

        # Print final summary
        print(f"✅ Done. Total rows fetched: {len(df)}")

        return df

    except Exception as e:
        print(f"Error fetching data from Supabase: {e}")
        return None

In [None]:
def execute_with_metrics(func):
    """Decorator to add performance metrics to processing functions"""
    def wrapper(*args, **kwargs):  # Modified to accept any arguments
        # Record start metrics
        start_time = time.time()
        start_cpu = psutil.cpu_percent(interval=None)
        start_memory = psutil.virtual_memory().percent

        # Execute function
        result_df = func(*args, **kwargs)

        # Record end metrics
        end_time = time.time()
        end_cpu = psutil.cpu_percent(interval=None)
        end_memory = psutil.virtual_memory().percent

        # Calculate performance metrics
        duration = end_time - start_time
        avg_cpu = (start_cpu + end_cpu) / 2
        avg_memory = (start_memory + end_memory) / 2

        # Modified Throughput Calculation (using input DataFrame length)
        input_all_data_df = args[0]  # Access the first positional argument
        throughput = len(input_all_data_df) / duration if duration > 0 else 0

        return {
            "data": result_df,
            "metrics": {
                "duration": duration,
                "cpu_percent": avg_cpu,
                "memory_percent": avg_memory,
                "throughput": throughput,  # Using the modified throughput
                "result_count": len(result_df)
            }
        }
    return wrapper

In [None]:
@execute_with_metrics
def process_most_expensive_car_per_location(all_data_df: pd.DataFrame):
    """
    Process 1: Find the most expensive car (name and price) in each distinct c_location.
    Returns DataFrame with columns: c_location, c_name, c_price
    """
    # Filter to relevant columns
    df = all_data_df[['c_location', 'c_name', 'c_price']]

    # Find most expensive car per location
    result_df = df.loc[df.groupby('c_location')['c_price'].idxmax()]
    result_df = result_df[['c_location', 'c_name', 'c_price']].sort_values('c_location')

    return result_df

In [None]:
@execute_with_metrics
def process_total_cars_per_year(all_data_df: pd.DataFrame):
    """
    Process 2: Calculate the total number of cars available for each c_year.
    Returns DataFrame with columns: c_year, total_cars, limited to top 5 earliest years.
    """
    # Filter to relevant column
    df = all_data_df[['c_year']]

    # Count cars per year
    result_df = df.groupby('c_year').size().reset_index(name='total_cars')

    # Sort by year and limit to the top 5 earliest years
    result_df = result_df.sort_values('c_year').head(5)

    return result_df

In [None]:
@execute_with_metrics
def process_average_price_by_engine_group(all_data_df: pd.DataFrame):
    """
    Process 3: Calculate the average c_price grouped by c_engine size in 500cc intervals.
    Returns DataFrame with columns: engine_group_start_cc, average_price (rounded to 2 decimal places),
    limited to top 5 engine groups with highest average prices.
    """
    # Filter to relevant columns
    df = all_data_df[['c_engine', 'c_price']]

    # Create engine group intervals (500cc each)
    df['engine_group_start_cc'] = (df['c_engine'] // 500) * 500

    # Calculate average price per engine group
    result_df = df.groupby('engine_group_start_cc')['c_price'].mean().reset_index()

    # Round average_price to 2 decimal places
    result_df = result_df.rename(columns={'c_price': 'average_price'})
    result_df['average_price'] = result_df['average_price'].round(2)

    # Sort by average_price in descending order and limit to top 5
    result_df = result_df.sort_values('average_price', ascending=False).head(5)

    return result_df

In [None]:
@execute_with_metrics
def process_total_cars_by_location(all_data_df: pd.DataFrame):
    """
    Process 4: Calculate the total number of cars for each distinct c_location.
    Returns DataFrame with columns: c_location, total_cars
    """
    # Filter to relevant column
    df = all_data_df[['c_location']]

    # Count cars per location
    result_df = df.groupby('c_location').size().reset_index(name='total_cars')
    result_df = result_df.sort_values('c_location')

    return result_df

In [None]:
@execute_with_metrics
def process_average_mileage_by_condition(all_data_df: pd.DataFrame):
    """
    Process 5: Calculate the average c_mileage_min for each car c_condition.
    Returns DataFrame with columns: c_condition, average_min_mileage
    """
    # Filter to relevant columns
    df = all_data_df[['c_condition', 'c_mileage_min']]

    # Calculate average mileage per condition
    result_df = df.groupby('c_condition')['c_mileage_min'].mean().reset_index()
    result_df = result_df.rename(columns={'c_mileage_min': 'average_min_mileage'})
    result_df = result_df.sort_values('c_condition')

    return result_df

## III. Output Display and Multithreaded Execution

In [None]:
def display_query_output(query_title, results):
    """
    Display query output in a formatted way

    Args:
        query_title: The title of the query
        results: Dictionary containing data and metrics returned by the execute_with_metrics decorator
    """
    metrics = results['metrics']
    data = results['data']

    print(f"\n[{query_title}]")
    print(data.to_string(index=False))

    print("\nQuery Performance:")
    print(f"  Query Time: {metrics['duration']:.4f} seconds")
    print(f"  Average CPU Usage: {metrics['cpu_percent']:.2f}%")
    print(f"  Average Memory Usage: {metrics['memory_percent']:.2f}%")
    print(f"  Throughput: {metrics['throughput']:.2f} records/second")
    print("--------------------------------------------------")

In [None]:
def run_multithreaded(all_data_df: pd.DataFrame):
    """
    Execute all data processing functions using multithreading

    Args:
        all_data_df: The pre-fetched DataFrame containing all data
    """
    print("\n===== MULTITHREADED DATA PROCESSING =====\n")

    # Define processing functions and their titles in a fixed, ordered list
    processing_functions_list = [
        ("Query 1: Most Expensive by Location", process_most_expensive_car_per_location),
        ("Query 2: Total Cars Per Year", process_total_cars_per_year),
        ("Query 3: Average Price By Engine Group", process_average_price_by_engine_group),
        ("Query 4: Total Cars By Location", process_total_cars_by_location),
        ("Query 5: Average Mileage By Condition", process_average_mileage_by_condition)
    ]

    # Convert to dictionary for ThreadPoolExecutor
    processing_functions = dict(processing_functions_list)

    # Record start time for multithreaded execution
    multithreaded_start = time.time()

    # Execute processing functions concurrently using ThreadPoolExecutor
    results = {}
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit all processing functions to the executor with the pre-fetched data
        future_to_query = {executor.submit(func, all_data_df): title
                          for title, func in processing_functions.items()}

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_query):
            query_title = future_to_query[future]
            try:
                results[query_title] = future.result()
            except Exception as e:
                print(f"{query_title} generated an exception: {e}")

    # Record end time and calculate total duration
    multithreaded_end = time.time()
    multithreaded_duration = multithreaded_end - multithreaded_start

    # Display results in the predefined order
    for query_title, _ in processing_functions_list:
        if query_title in results:
            display_query_output(query_title, results[query_title])
        else:
            print(f"\n[{query_title}]")
            print("Error: This query did not complete successfully.")
            print("--------------------------------------------------")

    print(f"\n===== MULTITHREADED PROCESSING COMPLETED =====")
    print(f"Total Execution Time: {multithreaded_duration:.4f} seconds")

In [None]:
if __name__ == "__main__":
    # First fetch all data from Supabase
    print("Fetching all data from Supabase...")
    complete_df = fetch_all_data_from_supabase(supabase)

    if complete_df is not None and not complete_df.empty:
        print(f"Total records processed: {len(complete_df)}")

        # Run multithreaded processing on the pre-fetched data
        run_multithreaded(complete_df)
    else:
        print("Error: Unable to fetch data from Supabase. Please check your connection and credentials.")

Fetching all data from Supabase...
Fetched 1000 rows so far...
Fetched 2000 rows so far...
Fetched 3000 rows so far...
Fetched 4000 rows so far...
Fetched 5000 rows so far...
Fetched 6000 rows so far...
Fetched 7000 rows so far...
Fetched 8000 rows so far...
Fetched 9000 rows so far...
Fetched 10000 rows so far...
Fetched 11000 rows so far...
Fetched 12000 rows so far...
Fetched 13000 rows so far...
Fetched 14000 rows so far...
Fetched 15000 rows so far...
Fetched 16000 rows so far...
Fetched 17000 rows so far...
Fetched 18000 rows so far...
Fetched 19000 rows so far...
Fetched 20000 rows so far...
Fetched 21000 rows so far...
Fetched 22000 rows so far...
Fetched 23000 rows so far...
Fetched 24000 rows so far...
Fetched 25000 rows so far...
Fetched 26000 rows so far...
Fetched 27000 rows so far...
Fetched 28000 rows so far...
Fetched 29000 rows so far...
Fetched 30000 rows so far...
Fetched 31000 rows so far...
Fetched 32000 rows so far...
Fetched 33000 rows so far...
Fetched 34000 row

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['engine_group_start_cc'] = (df['c_engine'] // 500) * 500
