In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, round, lower

# Set JAVA_HOME if needed (uncomment and adjust if Spark complains)
# os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"

# 1. Start Spark (no external packages needed)
spark = (
    SparkSession.builder
    .appName("Local CSV Reader")
    .getOrCreate()
)

# 2. Set your file path
# Easiest way: right-click file in Finder → Hold Option → "Copy [filename] as Pathname"
csv_path_branded_food = "/Users/billyheidel/Downloads/FoodData_Central_csv_2025-04-24/branded_food.csv"  # <-- paste your copied path here
csv_path_food = "/Users/billyheidel/Downloads/FoodData_Central_csv_2025-04-24/food.csv"  # <-- paste your copied path here
csv_path_food_nutrient = "/Users/billyheidel/Downloads/FoodData_Central_csv_2025-04-24/food_nutrient.csv"
csv_path_nutrient = "/Users/billyheidel/Downloads/FoodData_Central_csv_2025-04-24/nutrient.csv"

# 3. Read CSV in chunks
# Adjust inferSchema and header depending on your file
df_branded_food = spark.read.csv(
    csv_path_branded_food,
    header=True,        # assumes first row is header
    inferSchema=True,   # automatically infer types
    multiLine=False,    # change to True if CSV has multi-line fields
    escape='"'
)
df_food = spark.read.csv(
    csv_path_food,
    header=True,        # assumes first row is header
    inferSchema=True,   # automatically infer types
    multiLine=False,    # change to True if CSV has multi-line fields
    escape='"'
)
df_food_nutrient = spark.read.csv(
    csv_path_food_nutrient,
    header=True,        # assumes first row is header
    inferSchema=True,   # automatically infer types
    multiLine=False,    # change to True if CSV has multi-line fields
    escape='"'
)
df_nutrient = spark.read.csv(
    csv_path_nutrient,
    header=True,        # assumes first row is header
    inferSchema=True,   # automatically infer types
    multiLine=False,    # change to True if CSV has multi-line fields
    escape='"'
)

# 4. Take a peek (e.g., first 10 rows)
df_branded_food.show(10)
df_food.show(10)
df_food_nutrient.show(10)
df_nutrient.show(10, truncate=False)

# 5. (Optional) Select just some columns
# df = df.select("column1", "column2")

# 6. (Optional) Filter to limit data
# df = df.filter(df["some_numeric_column"] > 100)

# 7. Count rows
print(f"Row count: {df_branded_food.count()}")
print(f"Row count: {df_food.count()}")
print(f"Row count: {df_food_nutrient.count()}")
print(f"Row count: {df_nutrient.count()}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/02 13:28:37 WARN Utils: Your hostname, Billys-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.194.168.162 instead (on interface en0)
25/08/02 13:28:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/02 13:28:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------+--------------------+----------+-------------+--------------+--------------------+---------------------------+------------+-----------------+--------------------------+---------------------+-----------+--------------+-------------+--------------+--------------+-----------------+----------------------+-------------+-----------------+-------------+
| fdc_id|         brand_owner|brand_name|subbrand_name|      gtin_upc|         ingredients|not_a_significant_source_of|serving_size|serving_size_unit|household_serving_fulltext|branded_food_category|data_source|package_weight|modified_date|available_date|market_country|discontinued_date|preparation_state_code|trade_channel|short_description|material_code|
+-------+--------------------+----------+-------------+--------------+--------------------+---------------------------+------------+-----------------+--------------------------+---------------------+-----------+--------------+-------------+--------------+--------------+--------------



Row count: 26805037
Row count: 477


                                                                                

In [2]:
# Register it as a temporary SQL view
df_branded_food.createOrReplaceTempView("branded_food")
df_food.createOrReplaceTempView("food")
df_food_nutrient.createOrReplaceTempView("food_nutrient")
df_nutrient.createOrReplaceTempView("nutrient")

# Now you can run multiple SQL queries on the same table:
anabar_string = "anabar"
anabar = spark.sql(f"""
    SELECT * FROM food f
    JOIN branded_food b ON CAST(f.fdc_id AS string) = CAST(b.fdc_id AS string)
    WHERE lower(brand_owner) LIKE '%{anabar_string}%'
        OR lower(brand_name) LIKE '%{anabar_string}%'
        OR lower(brand_owner) LIKE '%{anabar_string}%'
        OR lower(f.description) LIKE '%{anabar_string}%'
""")
anabar.show(20, truncate=False)
print(f"Row count for '{anabar_string}': {anabar.count()}")

# Another query
quest_string = "quest"
quest = spark.sql(f"""
    SELECT * FROM food f
    JOIN branded_food b ON CAST(f.fdc_id AS string) = CAST(b.fdc_id AS string)
    WHERE lower(brand_owner) LIKE '%{quest_string}%'
        OR lower(brand_name) LIKE '%{quest_string}%'
        OR lower(brand_owner) LIKE '%{quest_string}%'
        OR lower(f.description) LIKE '%{quest_string}%'
""")
quest.show(10)
print(f"Row count for '{quest_string}': {quest.count()}")

# 8. Stop Spark when done
#spark.stop()

25/08/02 13:28:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------+-----------------------------------------------------------------------------------------------------+----------------------------+----------------+-------+----------------------+----------------------+-------------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

Row count for 'anabar': 19


                                                                                

+-------+------------+--------------------+--------------------+----------------+-------+--------------------+----------+-------------+--------------+--------------------+---------------------------+------------+-----------------+--------------------------+---------------------+-----------+--------------+-------------+--------------+--------------+-----------------+----------------------+-------------+-----------------+-------------+
| fdc_id|   data_type|         description|    food_category_id|publication_date| fdc_id|         brand_owner|brand_name|subbrand_name|      gtin_upc|         ingredients|not_a_significant_source_of|serving_size|serving_size_unit|household_serving_fulltext|branded_food_category|data_source|package_weight|modified_date|available_date|market_country|discontinued_date|preparation_state_code|trade_channel|short_description|material_code|
+-------+------------+--------------------+--------------------+----------------+-------+--------------------+----------+---

[Stage 50:>                                                         (0 + 8) / 8]

Row count for 'quest': 2377


                                                                                

In [3]:
nutrient_name = "copper"
nutrients_full = spark.sql(f"""
    SELECT n.*, fn.* FROM food_nutrient fn
    JOIN nutrient n ON CAST(n.id AS string) = CAST(fn.nutrient_id AS string)
    WHERE lower(n.name) LIKE '%{nutrient_name.lower()}%' OR (lower(n.name) LIKE '%energy%' AND lower(n.unit_name) LIKE '%kcal%')
    ORDER BY fn.fdc_id, n.rank
""")
nutrients_full.show(10)
print(f"Row count for '{nutrient_name}': {nutrients_full.count()}")

nutrients = spark.sql(f"""
    SELECT n.id, n.name, n.unit_name, fn.fdc_id, fn.amount
    FROM food_nutrient fn
    JOIN nutrient n ON CAST(n.id AS string) = CAST(fn.nutrient_id AS string)
    WHERE lower(n.name) LIKE '%{nutrient_name.lower()}%' OR (lower(n.name) LIKE '%energy%' AND lower(n.unit_name) LIKE '%kcal%')
    GROUP BY n.id, n.name, n.unit_name, fn.fdc_id, fn.amount
""")
nutrients.show(10)
print(f"Row count for '{nutrient_name}': {nutrients.count()}")

                                                                                

+----+----------+---------+------------+------+-------+------+-----------+------+-----------+-------------+-----+-----+------+----+--------+-----------------+-------------------+
|  id|      name|unit_name|nutrient_nbr|  rank|     id|fdc_id|nutrient_id|amount|data_points|derivation_id|  min|  max|median| loq|footnote|min_year_acquired|percent_daily_value|
+----+----------+---------+------------+------+-------+------+-----------+------+-----------+-------------+-----+-----+------+----+--------+-----------------+-------------------+
|1008|    Energy|     KCAL|       208.0| 300.0|1283685|167512|       1008| 307.0|          0|           49| NULL| NULL|  NULL|NULL|    NULL|             NULL|               NULL|
|1008|    Energy|     KCAL|       208.0| 300.0|1283702|167513|       1008| 330.0|          1|           47| NULL| NULL|  NULL|NULL|    NULL|             NULL|               NULL|
|1008|    Energy|     KCAL|       208.0| 300.0|1283710|167514|       1008| 377.0|          0|           4

                                                                                

Row count for 'copper': 1895642


                                                                                

+----+------+---------+-------+------+
|  id|  name|unit_name| fdc_id|amount|
+----+------+---------+-------+------+
|1008|Energy|     KCAL|1106452| 286.0|
|1008|Energy|     KCAL|1106457| 333.0|
|1008|Energy|     KCAL|1106791|  75.0|
|1008|Energy|     KCAL|1106809| 357.0|
|1008|Energy|     KCAL|1106880| 464.0|
|1008|Energy|     KCAL|1107135|  45.0|
|1008|Energy|     KCAL|1107288| 170.0|
|1008|Energy|     KCAL|1107367|  43.0|
|1008|Energy|     KCAL|1108212| 375.0|
|1008|Energy|     KCAL|1108359| 357.0|
+----+------+---------+-------+------+
only showing top 10 rows




Row count for 'copper': 1895619


                                                                                

In [4]:
nutrients_grouped = spark.sql(f"""
    WITH GROUPED_NUTRIENTS AS (
        SELECT fn.fdc_id
            ,MAX(CASE WHEN lower(n.name) LIKE '%energy%' THEN fn.amount ELSE NULL END) AS kcal
            ,COALESCE(MAX(CASE WHEN lower(n.name) LIKE '%{nutrient_name.lower()}%' THEN fn.amount ELSE NULL END),0) AS {nutrient_name.lower().replace(' ','_')}
            ,MAX(CASE WHEN lower(n.name) LIKE '%energy%' THEN n.name ELSE NULL END) AS kcal_name
            ,MAX(CASE WHEN lower(n.name) LIKE '%energy%' THEN n.unit_name ELSE NULL END) AS kcal_unit_name
            ,MAX(CASE WHEN lower(n.name) LIKE '%{nutrient_name.lower()}%' THEN n.name ELSE NULL END) AS nutrient_name
            ,MAX(CASE WHEN lower(n.name) LIKE '%{nutrient_name.lower()}%' THEN n.unit_name ELSE NULL END) AS nutrient_unit_name
        FROM food_nutrient fn
        JOIN nutrient n ON CAST(n.id AS string) = CAST(fn.nutrient_id AS string)
        WHERE lower(n.name) LIKE '%{nutrient_name.lower()}%' OR (lower(n.name) LIKE '%energy%' AND lower(n.unit_name) LIKE '%kcal%')
        GROUP BY fn.fdc_id
    ),

    ADDED_COLUMNS AS (
        SELECT *, {nutrient_name.lower().replace(' ','_')}/NULLIF(kcal,0) AS {nutrient_name.lower().replace(' ','_')}_per_kcal
        FROM GROUPED_NUTRIENTS gn
    ),

    COMBINED AS (
        SELECT CASE WHEN {nutrient_name.lower().replace(' ','_')}_per_kcal > 1/4 THEN 1 ELSE 0 END AS flag
            , *
        FROM ADDED_COLUMNS a
        JOIN food f ON CAST(f.fdc_id AS string) = CAST(a.fdc_id AS string)
        JOIN branded_food b ON CAST(f.fdc_id AS string) = CAST(b.fdc_id AS string)
        --WHERE COALESCE({nutrient_name.lower().replace(' ','_')}_per_kcal, 0) > 0
    )

    SELECT *
    FROM COMBINED
    ORDER BY {nutrient_name.lower().replace(' ','_')}_per_kcal DESC
""")
nutrients_grouped.show(100, truncate=False)
print(f"Row count for '{nutrient_name}': {nutrients_grouped.count()}")

                                                                                

+----+-------+------+-------------------+---------+--------------+-------------+------------------+--------------------+-------+------------+-----------------------------------------------------------------------------------------+----------------------------------------+----------------+-------+----------------------------------------------+------------------+-------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------



Row count for 'copper': 1841663


                                                                                

In [5]:
nutrients_grouped.filter(lower(col("description")).contains("cocoa powder")).show(1000, truncate=False)

                                                                                

+----+-------+------+------+---------+--------------+-------------+------------------+---------------+-------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+----------------+-------+--------------------------------------+-------------------------+--------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

25/08/02 14:12:50 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2002148 ms exceeds timeout 120000 ms
25/08/02 14:12:50 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/02 14:12:51 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at 