In [1]:
import pandas as pd
from typing import Dict, Callable, List
import ray
ray.init(num_cpus=32)

2024-04-04 13:31:47,027	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.13
Ray version:,2.10.0
Dashboard:,http://127.0.0.1:8265


In [2]:
subset_gen_columns = ['product_name', 'countries_en', 'brands', 'abbreviated_product_name', 'generic_name', 'quantity']
new_cols = ['sodium_100g', "fat_100g", "saturated-fat_100g","sugars_100g", "carbohydrates_100g", "proteins_100g"]
subset_scores_columns = ['ecoscore_score', 'ecoscore_grade', 'nutrient_levels_tags', 'main_category_en', 'nutriscore_score', 'nutriscore_grade', 'nova_group', 'food_groups_en']+ new_cols
columns = ['code'] + subset_gen_columns + subset_scores_columns

In [3]:
def clean_and_extract_column(df, column_name, columns=None):
    if columns:
        df = df[columns]
    # Set the code as the index
    df.set_index('code', inplace=True)

    # Clean the specified column
    df_cleaned = df[column_name].str.lower().str.strip().str.replace(r"\s+", " ").str.replace(r",\s+", ",").str.replace(r"[\*|\?|\.]", "")
    
    # Extract values into a new DataFrame with multi-index
    df_extracted = df_cleaned.str.extractall(r"(?P<{0}>[^,]+)".format(column_name))
    
    # Reset index and drop the automatically created column 'match'
    df_extracted.reset_index(inplace=True)
    df_extracted.drop('match', axis=1, inplace=True)
    
    # Remove duplicates
    df_extracted.drop_duplicates(inplace=True)
    
    # Set the original index of df as the index for df_extracted
    df_extracted.set_index('code', inplace=True)
    
    # Merge the extracted DataFrame with the original DataFrame
    df_merged = df_extracted.merge(df, right_index=True, left_index=True)
    df_merged.reset_index(inplace=True)
    
    return df_merged

In [4]:
# df.to_csv('../data/small_subsetV2.csv')

In [5]:
# Will not be changed
@ray.remote
def process_chunk(chunk: pd.DataFrame, dfMethod: Callable, **kwargs) -> pd.DataFrame:
    """
    Calculate the mean of specified columns, grouped by a specified category, for a given DataFrame chunk.

    Args:
    - chunk (pd.DataFrame): A chunk of the DataFrame.
    - dfMethod (Callable): A function that takes in a dataframe as an input and returns a dataframe
    - **kwargs: Additional keyword arguments to pass to dfMethod.

    Returns:
    - pd.DataFrame
    """
    return dfMethod(chunk, **kwargs)

In [6]:
# small = pd.read_csv('data/small.csv', delimiter='\t')
# small = small[columns]
# clean_and_extract_column(small, 'countries_en', columns)

In [7]:
# file_path: str = 'data-testing/medium.csv'
file_path: str = 'data-testing/raw.csv'
dfMethod: Callable
aggMethod: Callable
separator: str = '\t'
"""
Process a large CSV file to compute grouped means for specified columns and save the results.

Args:
- file_path (str): The path to the CSV file.
- dfMethod (Callable): A function that takes in a dataframe (chunk) as an input and returns a dataframe
- aggMethod (Callable): A function that aggregates the concatenated results from all chunks
- separator (str): delimiter for the input file
- **kwargs: Additional keyword arguments to pass to both dfMethod and aggMethod.
"""
chunk_size = 50000  # Define chunk size based on system's memory.

results = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size, sep=separator, low_memory=False):
    result = process_chunk.remote(chunk, clean_and_extract_column, column_name='countries_en', columns=columns)
    results.append(result)

# Retrieve and combine results from all chunks.
combined_results = pd.concat(ray.get(results))

# Uncomment for debugging purposes. 
# combined_results.to_csv("data/grouped_means_combined.csv", index=True)

# Final aggregation to ensure accurate mean calculation across all chunks.
# final_result = aggMethod(combined_results, **kwargs)

combined_results.to_csv("data/preprocessed.csv", index=False)
# food_groups_en

[36m(raylet)[0m Spilled 3374 MiB, 54 objects, write throughput 6040 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 6621 MiB, 104 objects, write throughput 5777 MiB/s.
[33m(raylet)[0m [2024-04-04 13:37:46,973 E 120648 120648] (raylet) node_manager.cc:2967: 3 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 814ddb47bc96b4923873d915b14ad10bb3b317007a5de389f7adbbe3, IP: 172.25.33.30) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.25.33.30`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_thresh

In [8]:
# file_path: str = 'data-testing/medium.csv'
file_path: str = 'data/preprocessed.csv'
dfMethod: Callable
aggMethod: Callable
separator: str = ','
"""
Process a large CSV file to compute grouped means for specified columns and save the results.

Args:
- file_path (str): The path to the CSV file.
- dfMethod (Callable): A function that takes in a dataframe (chunk) as an input and returns a dataframe
- aggMethod (Callable): A function that aggregates the concatenated results from all chunks
- separator (str): delimiter for the input file
- **kwargs: Additional keyword arguments to pass to both dfMethod and aggMethod.
"""
chunk_size = 50000  # Define chunk size based on system's memory.

results = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size, sep=separator, low_memory=False):
    result = process_chunk.remote(chunk, clean_and_extract_column, column_name='food_groups_en')
    results.append(result)

# Retrieve and combine results from all chunks.
combined_results = pd.concat(ray.get(results))

# Uncomment for debugging purposes. 
# combined_results.to_csv("data/grouped_means_combined.csv", index=True)

# Final aggregation to ensure accurate mean calculation across all chunks.
# final_result = aggMethod(combined_results, **kwargs)

combined_results.to_csv("data/preprocessed.csv", index=False)
# food_groups_en

In [9]:
combined_results

Unnamed: 0,code,food_groups_en_x,countries_en_x,product_name,countries_en_y,brands,abbreviated_product_name,generic_name,quantity,ecoscore_score,...,nutriscore_score,nutriscore_grade,nova_group,food_groups_en_y,sodium_100g,fat_100g,saturated-fat_100g,sugars_100g,carbohydrates_100g,proteins_100g
0,00000000000000225,fruits and vegetables,france,jeunes pousses,France,endives,,,,79.0,...,,unknown,,"Fruits and vegetables,Vegetables",,,,,,
1,00000000000000225,vegetables,france,jeunes pousses,France,endives,,,,79.0,...,,unknown,,"Fruits and vegetables,Vegetables",,,,,,
2,00000000000026772226,milk and dairy products,france,Skyr,France,Danone,,,480 g,67.0,...,-5.0,a,,"Milk and dairy products,Dairy desserts",0.036,0.2,0.1,3.9,3.9,10.0
3,00000000000026772226,dairy desserts,france,Skyr,France,Danone,,,480 g,67.0,...,-5.0,a,,"Milk and dairy products,Dairy desserts",0.036,0.2,0.1,3.9,3.9,10.0
4,0000000000100,fats and sauces,france,moutarde au moût de raisin,France,courte paille,,,100g,54.0,...,18.0,d,,"Fats and sauces,Dressings and sauces",1.840,8.2,2.2,22.0,29.0,5.1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2761,99999995,meat other than poultry,france,Steak haché 5%,France,,,,,-16.0,...,-2.0,a,,"Fish‚ Meat‚ Eggs,Meat,Meat other than poultry",0.072,5.0,2.3,0.0,0.0,21.5
2762,9999999900686,sugary snacks,belgium,Marrons glacés,Belgium,,,,,,...,,unknown,,"Sugary snacks,Sweets",,,,,,
2763,9999999900686,sweets,belgium,Marrons glacés,Belgium,,,,,,...,,unknown,,"Sugary snacks,Sweets",,,,,,
2764,9999999999994,milk and dairy products,france,Light & Free SKYR A BOIRE,France,,,,,,...,25.0,e,,"Milk and dairy products,Dairy desserts",1.200,28.0,13.0,49.0,70.0,2.0
