In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, size, col
import json
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

In [28]:
#Saprksession starts here
spark = SparkSession.builder.appName("FoodML").getOrCreate()

In [29]:
#read json
usda_fnds_json = spark.read.format('json').option('multiline', 'true').load("FoodData_Central_foundation_food_json_2025-04-24.json")

In [30]:
# checking the schema
usda_fnds_json.printSchema()

root
 |-- FoundationFoods: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dataType: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- fdcId: long (nullable = true)
 |    |    |-- foodAttributes: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- foodCategory: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |-- foodClass: string (nullable = true)
 |    |    |-- foodNutrients: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- amount: double (nullable = true)
 |    |    |    |    |-- dataPoints: long (nullable = true)
 |    |    |    |    |-- foodNutrientDerivation: struct (nullable = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |    |-- foodNutrientSo

In [31]:
df_foods = usda_fnds_json.select(explode("FoundationFoods").alias("food"))

# explode nutrients
df_nutrients = df_foods.select(
    col("food.fdcId").alias("fdcId"),
    col("food.description").alias("description"),
    col("food.foodCategory.description").alias("FoodCatDescription"),
    explode("food.foodNutrients").alias("nutrient")
)

df_nutrients.printSchema()


df_nutrients =  df_nutrients.select(
    "fdcId",
    "FoodCatDescription",
    "description",
    col("nutrient.nutrient.name").alias("nutrient_name"),
    col("nutrient.amount").alias("nutrient_value"),
    col("nutrient.nutrient.unitName").alias("nutrient_unit")
)

df_nutrients.printSchema()
df_nutrients.show(200)

root
 |-- fdcId: long (nullable = true)
 |-- description: string (nullable = true)
 |-- FoodCatDescription: string (nullable = true)
 |-- nutrient: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- dataPoints: long (nullable = true)
 |    |-- foodNutrientDerivation: struct (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- foodNutrientSource: struct (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |-- footnote: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- max: double (nullable = true)
 |    |-- median: double (nullable = true)
 |    |-- min: double (nullable = true)
 |    |-- nutrient: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: stri

In [32]:
# pivot
df_nutrients_pivot = df_nutrients.groupBy("fdcId", "description", "FoodCatDescription")\
    .pivot("nutrient_name")\
    .agg(F.first("nutrient_value"))

df_final = df_nutrients_pivot.fillna(0)

df_final.show(5)

+-------+--------------------+--------------------+------------------------------+-------------------------+-------------------------------------+----------------------------------+-------+--------+-----+-------------+-----------+---------------+---------------+-------+------+--------+--------------+-----------+-----------+-----------+---------------------------+--------------------------+---------------+--------------+---------------+-----------+-------------+-----------------------------------+----------------------------+----------------------------------+---------------------------+--------------+-----------+----------+----------+--------------------+-------------------+--------+-------+--------+-------+-------------------+--------------------+------+--------------------------------+---------------------------------+------------------+-------------------+--------------+----------+-------------+----------------------------------+----------------------------------+-------------------

In [33]:
#some cleaning
from pyspark.sql.functions import col

def sanitize_column_name(name):
    return name.replace(" ", "_") \
               .replace(",", "") \
               .replace("(", "") \
               .replace(")", "") \
               .replace(".", "") \
               .replace("/", "_") \
               .replace("-", "_")

cleaned_cols = [col(f"`{c}`").alias(sanitize_column_name(c)) for c in df_final.columns]
df_final = df_final.select(*cleaned_cols)


In [34]:

# feature + label

#label_indexer = StringIndexer(inputCol="description",outputCol= "label")
label_indexer = StringIndexer(inputCol="FoodCatDescription",outputCol= "label")

nutrient_cols =[col for col in df_final.columns if col not in["FoodCatDescription","fdcId", "description"]]

assembler = VectorAssembler(inputCols=nutrient_cols, outputCol="features")

In [35]:
#pipeline
pipeline = Pipeline(stages =[label_indexer, assembler])

df_rdy = pipeline.fit(df_final).transform(df_final)
df_rdy.select("label", "features").show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|fe

In [36]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# Train/Test split
train, test = df_rdy.randomSplit([0.8, 0.2], seed=42)

# model
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

# train
model = rf.fit(train)

# prediction
predictions = model.transform(test)

# evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy:.3f}")

Test Accuracy = 0.827


In [37]:
predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|    9|
|  0.0|       1.0|    1|
|  1.0|       0.0|    1|
|  1.0|       1.0|    5|
|  2.0|       0.0|    1|
|  2.0|       2.0|    9|
|  3.0|       3.0|    7|
|  3.0|       5.0|    1|
|  4.0|       4.0|    2|
|  5.0|       5.0|    1|
|  6.0|       6.0|    5|
|  6.0|       7.0|    1|
|  7.0|       7.0|    1|
|  8.0|       2.0|    1|
|  8.0|       8.0|    1|
|  9.0|       9.0|    2|
| 10.0|       2.0|    1|
| 11.0|      11.0|    1|
| 13.0|       3.0|    1|
| 14.0|       0.0|    1|
+-----+----------+-----+

