In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, max, count, collect_list, concat_ws, coalesce, upper
import os

spark = SparkSession.builder \
    .appName("NutritionalValuesGenerator") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [12]:
dataset_dir = "/home/tedy/Git/FII-BDA/sampled_dataset"


df_food = spark.read.parquet(f"{dataset_dir}/food.parquet")
df_food_nutrient = spark.read.parquet(f"{dataset_dir}/food_nutrient.parquet")
df_nutrient = spark.read.parquet(f"{dataset_dir}/nutrient.parquet")
df_food_portion = spark.read.parquet(f"{dataset_dir}/food_portion.parquet")
df_measure_unit = spark.read.parquet(f"{dataset_dir}/measure_unit.parquet")


print(f"Food records: {df_food.count()}")
print(f"Food-Nutrient records: {df_food_nutrient.count()}")
print(f"Nutrient records: {df_nutrient.count()}")
print(f"Food Portion records: {df_food_portion.count()}")
print(f"Measure Unit records: {df_measure_unit.count()}")

Food records: 5000
Food-Nutrient records: 65141
Nutrient records: 477
Food Portion records: 104
Measure Unit records: 5


In [13]:
df_food_nutrient_clean = df_food_nutrient.select(
    col("fdc_id"),
    col("nutrient_id"),
    col("amount"),
    col("data_points"),
    col("min"),
    col("max"),
    col("median")
).filter(col("amount").isNotNull())

df_nutrient_clean = df_nutrient.select(
    col("id").alias("nutrient_id"),
    col("name").alias("nutrient_name"),
    col("unit_name").alias("nutrient_unit"),
    col("rank")
)

df_food_enriched = df_food.select(
    col("fdc_id"),
    col("description"),
    col("data_type")
)

df_merged = df_food_enriched.join(
    df_food_nutrient_clean,
    on="fdc_id",
    how="inner"
).join(
    df_nutrient_clean,
    on="nutrient_id",
    how="left"
)

df_merged = df_merged.fillna("Unknown", subset=["nutrient_name", "nutrient_unit"])

df_merged = df_merged.withColumn(
    "nutrient_display",
    when(col("nutrient_unit").isNotNull(),
         concat_ws(" (", col("nutrient_name"), concat_ws(")", col("nutrient_unit")))
    ).otherwise(col("nutrient_name"))
)

print(f"Merged records: {df_merged.count()}")

Merged records: 65141


In [14]:
df_aggregated = df_merged.groupBy("fdc_id", "description", "data_type").agg(
    collect_list("nutrient_display").alias("nutrients"),
    collect_list("amount").alias("amounts"),
    collect_list("median").alias("medians"),
    collect_list("min").alias("mins"),
    collect_list("max").alias("maxs"),
    count("nutrient_id").alias("total_nutrients")
)

df_aggregated = df_aggregated.withColumn(
    "nutrients_json",
    concat_ws("|", col("nutrients"))
)

df_pivot = df_merged.groupBy("fdc_id").pivot("nutrient_name").agg(
    coalesce(
        max(when(col("nutrient_name").isNotNull(), col("amount"))),
        max(when(col("nutrient_name").isNotNull(), col("median")))
    ).alias("amount")
)

print(f"Aggregated foods: {df_aggregated.count()}")
print(f"Pivot table columns: {len(df_pivot.columns)}")
print(f"Foods with nutrients: {df_pivot.count()}")

Aggregated foods: 4688
Pivot table columns: 180
Foods with nutrients: 4688


In [15]:
df_merged_clean = df_merged.filter(col("amount").isNotNull())

# Instead of 40+ sequential joins, use a more efficient pivot approach
# Create a pivot table with all nutrients at once
df_nutrients_pivot = df_merged_clean.groupBy("fdc_id").pivot("nutrient_name").agg(
    max(col("amount"))
)

# Now join with food info - just ONE join instead of 40+
df_metrics = df_food_enriched.join(df_nutrients_pivot, on="fdc_id", how="left")

# Select and rename the columns we need (use exact nutrient names from your data)
df_metrics = df_metrics.select(
    "fdc_id",
    "description",
    "data_type",
    # Macronutrients
    col("Energy").alias("energy"),
    col("Protein").alias("protein"),
    col("Carbohydrate, by difference").alias("carbs"),
    col("Total lipid (fat)").alias("total_fat"),
    # Food composition
    col("Water").alias("water"),
    col("Ash").alias("ash"),
    col("Alcohol, ethyl").alias("alcohol"),
    col("Caffeine").alias("caffeine"),
    # Fiber and Sugars
    col("Fiber, total dietary").alias("fiber"),
    col("Total Sugars").alias("sugars"),
    col("Glucose").alias("glucose"),
    col("Fructose").alias("fructose"),
    col("Sucrose").alias("sucrose"),
    col("Lactose").alias("lactose"),
    # Fats
    col("Fatty acids, total saturated").alias("saturated_fat"),
    col("Fatty acids, total monounsaturated").alias("monounsaturated_fat"),
    col("Fatty acids, total polyunsaturated").alias("polyunsaturated_fat"),
    col("Fatty acids, total trans").alias("trans_fat"),
    col("Cholesterol").alias("cholesterol"),
    # Vitamins
    col("Vitamin A, RAE").alias("vitamin_a"),
    col("Vitamin C, total ascorbic acid").alias("vitamin_c"),
    col("Vitamin D (D2 + D3)").alias("vitamin_d"),
    col("Vitamin E (alpha-tocopherol)").alias("vitamin_e"),
    col("Vitamin K (phylloquinone)").alias("vitamin_k"),
    # B Vitamins
    col("Thiamin").alias("thiamin_b1"),
    col("Riboflavin").alias("riboflavin_b2"),
    col("Niacin").alias("niacin_b3"),
    col("Vitamin B-6").alias("vitamin_b6"),
    col("Folate, total").alias("folate"),
    col("Vitamin B-12").alias("vitamin_b12"),
    # Additional
    col("Choline, total").alias("choline"),
    # Minerals
    col("Calcium, Ca").alias("calcium"),
    col("Iron, Fe").alias("iron"),
    col("Magnesium, Mg").alias("magnesium"),
    col("Phosphorus, P").alias("phosphorus"),
    col("Potassium, K").alias("potassium"),
    col("Sodium, Na").alias("sodium"),
    col("Zinc, Zn").alias("zinc"),
    col("Copper, Cu").alias("copper"),
    col("Manganese, Mn").alias("manganese"),
    col("Selenium, Se").alias("selenium"),
    # Carotenoids
    col("Carotene, beta").alias("beta_carotene"),
    col("Lycopene").alias("lycopene"),
    col("Lutein + zeaxanthin").alias("lutein_zeaxanthin")
)

# Checkpoint to break the lineage and avoid huge query plans
df_metrics = df_metrics.localCheckpoint()

print(f"Foods with energy: {df_metrics.filter(col('energy').isNotNull()).count()}")
print(f"Foods with protein: {df_metrics.filter(col('protein').isNotNull()).count()}")
print(f"Foods with carbs: {df_metrics.filter(col('carbs').isNotNull()).count()}")
print(f"Foods with total fat: {df_metrics.filter(col('total_fat').isNotNull()).count()}")
print(f"Foods with water: {df_metrics.filter(col('water').isNotNull()).count()}")
print(f"Foods with fiber: {df_metrics.filter(col('fiber').isNotNull()).count()}")
print(f"Foods with vitamin C: {df_metrics.filter(col('vitamin_c').isNotNull()).count()}")
print(f"Foods with calcium: {df_metrics.filter(col('calcium').isNotNull()).count()}")
print(f"Foods with iron: {df_metrics.filter(col('iron').isNotNull()).count()}")


                                                                                

Foods with energy: 4485
Foods with protein: 4550
Foods with carbs: 4513
Foods with total fat: 4505
Foods with water: 50
Foods with fiber: 3827
Foods with vitamin C: 2136
Foods with calcium: 3741
Foods with iron: 3747


In [16]:
df_final_profile = df_aggregated.select(
    col("fdc_id"),
    col("description").alias("food_description"),
    col("data_type").alias("food_type"),
    col("total_nutrients")
).join(
    df_metrics.select(
        "fdc_id", "energy", "protein", "carbs", "total_fat", 
        "water", "ash", "alcohol", "caffeine",
        "fiber", "sugars", "glucose", "fructose", "sucrose", "lactose",
        "saturated_fat", "monounsaturated_fat", "polyunsaturated_fat", "trans_fat", "cholesterol",
        "vitamin_a", "vitamin_c", "vitamin_d", "vitamin_e", "vitamin_k",
        "thiamin_b1", "riboflavin_b2", "niacin_b3", "vitamin_b6", "folate", "vitamin_b12",
        "choline",
        "calcium", "iron", "magnesium", "phosphorus", "potassium", "sodium", 
        "zinc", "copper", "manganese", "selenium",
        "beta_carotene", "lycopene", "lutein_zeaxanthin"
    ),
    on="fdc_id",
    how="left"
)

df_final_profile = df_final_profile.select(
    "fdc_id",
    "food_description",
    "food_type",
    "total_nutrients",
    # Macronutrients
    "energy",
    "protein",
    "carbs",
    "total_fat",
    # Food composition
    "water",
    "ash",
    "alcohol",
    "caffeine",
    # Fiber and Sugars
    "fiber",
    "sugars",
    "glucose",
    "fructose",
    "sucrose",
    "lactose",
    # Fats breakdown
    "saturated_fat",
    "monounsaturated_fat",
    "polyunsaturated_fat",
    "trans_fat",
    "cholesterol",
    # Vitamins
    "vitamin_a",
    "vitamin_c",
    "vitamin_d",
    "vitamin_e",
    "vitamin_k",
    # B Vitamins
    "thiamin_b1",
    "riboflavin_b2",
    "niacin_b3",
    "vitamin_b6",
    "folate",
    "vitamin_b12",
    # Additional nutrients
    "choline",
    # Minerals
    "calcium",
    "iron",
    "magnesium",
    "phosphorus",
    "potassium",
    "sodium",
    "zinc",
    "copper",
    "manganese",
    "selenium",
    # Carotenoids
    "beta_carotene",
    "lycopene",
    "lutein_zeaxanthin"
)

print(f"Final nutritional profiles: {df_final_profile.count()}")
print(f"Total columns: {len(df_final_profile.columns)}")
print(f"\nSample of comprehensive nutritional data:")
df_final_profile.show(5, truncate=False)

Final nutritional profiles: 4688
Total columns: 48

Sample of comprehensive nutritional data:
+-------+------------------------------------------------------------------------------------------+------------+---------------+------+-------+-----+---------+-----+----+-------+--------+-----+------+-------+--------+-------+-------+-------------+-------------------+-------------------+---------+-----------+---------+---------+---------+---------+---------+----------+-------------+---------+----------+------+-----------+-------+-------+----+---------+----------+---------+-------+----+------+---------+--------+-------------+--------+-----------------+
|fdc_id |food_description                                                                          |food_type   |total_nutrients|energy|protein|carbs|total_fat|water|ash |alcohol|caffeine|fiber|sugars|glucose|fructose|sucrose|lactose|saturated_fat|monounsaturated_fat|polyunsaturated_fat|trans_fat|cholesterol|vitamin_a|vitamin_c|vitamin_d|vitamin_

In [None]:
output_dir = "/home/tedy/Git/FII-BDA/output"
os.makedirs(output_dir, exist_ok=True)


df_final_profile.repartition(1).write.mode("overwrite").parquet(
    f"{output_dir}/nutritional_profiles"
 )


print(f"Results exported to {output_dir}")
print("Generated files:")
print("  - nutritional_profiles/")

Results exported to /home/tedy/Git/FII-BDA/output
Generated files:
  - nutritional_profiles/
