In [0]:
%%python
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import round

In [0]:
# check if the directory is already mounted to avoid remounting it
mount_point = "/mnt/nutritiondata"
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    # If it’s already mounted, we unmount it before mounting again
    dbutils.fs.unmount(mount_point)


# Set up the necessary configurations for authentication
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "Client ID",
"fs.azure.account.oauth2.client.secret": 'Secret Key',
"fs.azure.account.oauth2.client.endpoint": "tenat ID"}

# Mount the directory
dbutils.fs.mount(
    source="abfss://nutrition-data@foodnutritionstorage.dfs.core.windows.net",
    mount_point=mount_point,
    extra_configs=configs
)

/mnt/nutritiondata has been unmounted.
Out[2]: True

In [0]:
# List the contents of the mounted directory to check if the files are accessible

%fs
ls "mnt/nutritiondata"

path,name,size,modificationTime
dbfs:/mnt/nutritiondata/raw-data/,raw-data/,0,1740974869000
dbfs:/mnt/nutritiondata/transformed-data/,transformed-data/,0,1740974893000


In [0]:
# Display the list of files in the mounted directory to verify the available data
display(dbutils.fs.ls("/mnt/nutritiondata"))

path,name,size,modificationTime
dbfs:/mnt/nutritiondata/raw-data/,raw-data/,0,1740974869000
dbfs:/mnt/nutritiondata/transformed-data/,transformed-data/,0,1740974893000


In [0]:
# Display the list of raw data files in the specified folder to review the available datasets
display(dbutils.fs.ls("/mnt/nutritiondata/raw-data/"))


path,name,size,modificationTime
dbfs:/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP1.csv,FOOD_DATA_GROUP1.csv,97240,1740985951000
dbfs:/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP3.csv,FOOD_DATA_GROUP3.csv,97803,1740985950000
dbfs:/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP4.csv,FOOD_DATA_GROUP4.csv,38705,1740985950000
dbfs:/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP5.csv,FOOD_DATA_GROUP5.csv,121112,1740985953000
dbfs:/mnt/nutritiondata/raw-data/FOOD_GROUP_DATA2.csv,FOOD_GROUP_DATA2.csv,51706,1740985950000


In [0]:
#Load all the five datasets
group1 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP1.csv")
group2 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/nutritiondata/raw-data/FOOD_GROUP_DATA2.csv")
group3 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP3.csv")
group4 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP4.csv")
group5 = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/nutritiondata/raw-data/FOOD_DATA_GROUP5.csv")
     

In [0]:
# Check the schema of each group
group1.printSchema()
group2.printSchema()
group3.printSchema()
group4.printSchema()
group5.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Unnamed: 0: integer (nullable = true)
 |-- food: string (nullable = true)
 |-- Caloric Value: integer (nullable = true)
 |-- Fat: double (nullable = true)
 |-- Saturated Fats: double (nullable = true)
 |-- Monounsaturated Fats: double (nullable = true)
 |-- Polyunsaturated Fats: double (nullable = true)
 |-- Carbohydrates: double (nullable = true)
 |-- Sugars: double (nullable = true)
 |-- Protein: double (nullable = true)
 |-- Dietary Fiber: double (nullable = true)
 |-- Cholesterol: double (nullable = true)
 |-- Sodium: double (nullable = true)
 |-- Water: double (nullable = true)
 |-- Vitamin A: double (nullable = true)
 |-- Vitamin B1: double (nullable = true)
 |-- Vitamin B11: double (nullable = true)
 |-- Vitamin B12: double (nullable = true)
 |-- Vitamin B2: double (nullable = true)
 |-- Vitamin B3: double (nullable = true)
 |-- Vitamin B5: double (nullable = true)
 |-- Vitamin B6: double (nullable = true)
 |-- Vitamin C: double (nul

In [0]:
#verify whether all datasets have identical schemas

schema_list = [group1.schema, group2.schema, group3.schema, group4.schema, group5.schema]
if all(schema == schema_list[0] for schema in schema_list):
    print("All schemas match. Safe to merge!")
else:
    print("Schemas do NOT match!")


All schemas match. Safe to merge!


In [0]:
# Remove unnecessary columns (_c0 and Unnamed: 0). These columns don't contain useful information for analysis
group1 = group1.drop("_c0", "Unnamed: 0")
group2 = group2.drop("_c0", "Unnamed: 0")
group3 = group3.drop("_c0", "Unnamed: 0")
group4 = group4.drop("_c0", "Unnamed: 0")
group5 = group5.drop("_c0", "Unnamed: 0")


In [0]:
# Print the schema of 'group1' to verify that the unnecessary columns have been successfully removed
group1.printSchema()


root
 |-- food: string (nullable = true)
 |-- Caloric Value: integer (nullable = true)
 |-- Fat: double (nullable = true)
 |-- Saturated Fats: double (nullable = true)
 |-- Monounsaturated Fats: double (nullable = true)
 |-- Polyunsaturated Fats: double (nullable = true)
 |-- Carbohydrates: double (nullable = true)
 |-- Sugars: double (nullable = true)
 |-- Protein: double (nullable = true)
 |-- Dietary Fiber: double (nullable = true)
 |-- Cholesterol: double (nullable = true)
 |-- Sodium: double (nullable = true)
 |-- Water: double (nullable = true)
 |-- Vitamin A: double (nullable = true)
 |-- Vitamin B1: double (nullable = true)
 |-- Vitamin B11: double (nullable = true)
 |-- Vitamin B12: double (nullable = true)
 |-- Vitamin B2: double (nullable = true)
 |-- Vitamin B3: double (nullable = true)
 |-- Vitamin B5: double (nullable = true)
 |-- Vitamin B6: double (nullable = true)
 |-- Vitamin C: double (nullable = true)
 |-- Vitamin D: double (nullable = true)
 |-- Vitamin E: double (

In [0]:
print(group1.columns)

['food', 'Caloric Value', 'Fat', 'Saturated Fats', 'Monounsaturated Fats', 'Polyunsaturated Fats', 'Carbohydrates', 'Sugars', 'Protein', 'Dietary Fiber', 'Cholesterol', 'Sodium', 'Water', 'Vitamin A', 'Vitamin B1', 'Vitamin B11', 'Vitamin B12', 'Vitamin B2', 'Vitamin B3', 'Vitamin B5', 'Vitamin B6', 'Vitamin C', 'Vitamin D', 'Vitamin E', 'Vitamin K', 'Calcium', 'Copper', 'Iron', 'Magnesium', 'Manganese', 'Phosphorus', 'Potassium', 'Selenium', 'Zinc', 'Nutrition Density']


In [0]:
# Show the first few rows of each DataFrame
group1.select("food", "Caloric Value", "Fat", "Saturated Fats").show(20, truncate=False)



+--------------------------------+-------------+----+--------------+
|food                            |Caloric Value|Fat |Saturated Fats|
+--------------------------------+-------------+----+--------------+
|cream cheese                    |51           |5.0 |2.9           |
|neufchatel cheese               |215          |19.4|10.9          |
|requeijao cremoso light catupiry|49           |3.6 |2.3           |
|ricotta cheese                  |30           |2.0 |1.3           |
|cream cheese low fat            |30           |2.3 |1.4           |
|cream cheese fat free           |19           |0.2 |0.1           |
|gruyere cheese                  |116          |9.1 |5.3           |
|cheddar cheese                  |113          |9.3 |5.3           |
|parmesan cheese                 |71           |4.5 |2.7           |
|romano cheese                   |19           |1.3 |0.9           |
|parmesan cheese grated          |21           |1.4 |0.8           |
|port salut cheese               |

In [0]:

# Function to count missing values in a DataFrame
def count_missing_values(df):
    return df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# List of DataFrames
groups = [group1, group2, group3, group4, group5]

# Count missing values for each group and display the results
for i, group in enumerate(groups, start=1):
    missing_counts = count_missing_values(group)
    print(f"Missing values in group{i}:")
    display(missing_counts)

Missing values in group1:


food,Caloric Value,Fat,Saturated Fats,Monounsaturated Fats,Polyunsaturated Fats,Carbohydrates,Sugars,Protein,Dietary Fiber,Cholesterol,Sodium,Water,Vitamin A,Vitamin B1,Vitamin B11,Vitamin B12,Vitamin B2,Vitamin B3,Vitamin B5,Vitamin B6,Vitamin C,Vitamin D,Vitamin E,Vitamin K,Calcium,Copper,Iron,Magnesium,Manganese,Phosphorus,Potassium,Selenium,Zinc,Nutrition Density
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Missing values in group2:


_c0,Unnamed: 0,food,Caloric Value,Fat,Saturated Fats,Monounsaturated Fats,Polyunsaturated Fats,Carbohydrates,Sugars,Protein,Dietary Fiber,Cholesterol,Sodium,Water,Vitamin A,Vitamin B1,Vitamin B11,Vitamin B12,Vitamin B2,Vitamin B3,Vitamin B5,Vitamin B6,Vitamin C,Vitamin D,Vitamin E,Vitamin K,Calcium,Copper,Iron,Magnesium,Manganese,Phosphorus,Potassium,Selenium,Zinc,Nutrition Density
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Missing values in group3:


_c0,Unnamed: 0,food,Caloric Value,Fat,Saturated Fats,Monounsaturated Fats,Polyunsaturated Fats,Carbohydrates,Sugars,Protein,Dietary Fiber,Cholesterol,Sodium,Water,Vitamin A,Vitamin B1,Vitamin B11,Vitamin B12,Vitamin B2,Vitamin B3,Vitamin B5,Vitamin B6,Vitamin C,Vitamin D,Vitamin E,Vitamin K,Calcium,Copper,Iron,Magnesium,Manganese,Phosphorus,Potassium,Selenium,Zinc,Nutrition Density
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Missing values in group4:


_c0,Unnamed: 0,food,Caloric Value,Fat,Saturated Fats,Monounsaturated Fats,Polyunsaturated Fats,Carbohydrates,Sugars,Protein,Dietary Fiber,Cholesterol,Sodium,Water,Vitamin A,Vitamin B1,Vitamin B11,Vitamin B12,Vitamin B2,Vitamin B3,Vitamin B5,Vitamin B6,Vitamin C,Vitamin D,Vitamin E,Vitamin K,Calcium,Copper,Iron,Magnesium,Manganese,Phosphorus,Potassium,Selenium,Zinc,Nutrition Density
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Missing values in group5:


_c0,Unnamed: 0,food,Caloric Value,Fat,Saturated Fats,Monounsaturated Fats,Polyunsaturated Fats,Carbohydrates,Sugars,Protein,Dietary Fiber,Cholesterol,Sodium,Water,Vitamin A,Vitamin B1,Vitamin B11,Vitamin B12,Vitamin B2,Vitamin B3,Vitamin B5,Vitamin B6,Vitamin C,Vitamin D,Vitamin E,Vitamin K,Calcium,Copper,Iron,Magnesium,Manganese,Phosphorus,Potassium,Selenium,Zinc,Nutrition Density
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:

# Function to clean data
def clean_data(group):
    # Drop rows with null values in key columns
    group_clean = group.dropna(subset=['Caloric Value', 'Protein'])
    
    # Cast 'Caloric Value' and 'Protein' to numeric types
    group_clean = group_clean.withColumn('Caloric Value', F.col('Caloric Value').cast('float')) \
                             .withColumn('Protein', F.col('Protein').cast('float'))
    
    return group_clean

# Function to filter high-protein and low-calorie foods (both conditions)
def filter_high_protein_and_low_calorie(group):
    # Filter foods with Protein > 10 and Caloric Value < 300
    high_protein_and_low_calorie = group.filter((group['Protein'] > 10) & (group['Caloric Value'] < 300))
    
    return high_protein_and_low_calorie

# List of all groups (assuming you have already read them)
groups = [group1, group2, group3, group4, group5]

# Function to process all groups
def process_group_data(groups):
    combined_result_list = []
    
    for group in groups:
        # Clean each group
        group_clean = clean_data(group)
        
        # Filter foods that are both high-protein and low-calorie
        result = filter_high_protein_and_low_calorie(group_clean)
        
        combined_result_list.append(result)
    
    return combined_result_list

# Process the data for all groups
combined_results = process_group_data(groups)

# Combine the results from all groups
final_combined = combined_results[0]
for result in combined_results[1:]:
    final_combined = final_combined.unionByName(result, allowMissingColumns=True)

# Show the final combined result
final_combined.show()
final_combined.select("food", "Caloric Value", "Fat").show(20, truncate=False)


+--------------------+-------------+----+--------------+--------------------+--------------------+-------------+------+-------+-------------+-----------+------+-----+---------+----------+-----------+-----------+----------+----------+----------+----------+---------+---------+---------+---------+-------+------+----+---------+---------+----------+---------+--------+-----+-----------------+
|                food|Caloric Value| Fat|Saturated Fats|Monounsaturated Fats|Polyunsaturated Fats|Carbohydrates|Sugars|Protein|Dietary Fiber|Cholesterol|Sodium|Water|Vitamin A|Vitamin B1|Vitamin B11|Vitamin B12|Vitamin B2|Vitamin B3|Vitamin B5|Vitamin B6|Vitamin C|Vitamin D|Vitamin E|Vitamin K|Calcium|Copper|Iron|Magnesium|Manganese|Phosphorus|Potassium|Selenium| Zinc|Nutrition Density|
+--------------------+-------------+----+--------------+--------------------+--------------------+-------------+------+-------+-------------+-----------+------+-----+---------+----------+-----------+-----------+---------

In [0]:
final_combined.select("food", "Caloric Value", "Protein").show()

+--------------------+-------------+-------+
|                food|Caloric Value|Protein|
+--------------------+-------------+-------+
|mozzarella cheese...|        159.0|   35.8|
|      chicken spread|         88.0|   10.1|
|       beef empanada|        298.0|   10.1|
|frijoles with cheese|        225.0|   11.4|
|    turkey and gravy|        161.0|   14.1|
|           crab cake|        160.0|   11.3|
|taco with chicken...|        185.0|   13.0|
|     spinach souffle|        230.0|   10.7|
|     chili con carne|        271.0|   14.7|
|       tamale navajo|        285.0|   11.7|
|   pupusas del cerdo|        283.0|   14.0|
|         pork tamale|        247.0|   10.4|
|taco salad with c...|        193.0|   11.6|
|  vegetarian fillets|        247.0|   19.6|
|spaghetti with me...|        255.0|   14.3|
|pasta with meatba...|        273.0|   10.6|
|pasta with sliced...|        227.0|   11.0|
|vegetable chicken...|        166.0|   12.3|
|       bean ham soup|        198.0|   10.3|
|        s

In [0]:
# Get the number of rows in the final_combined DataFrame
num_rows = final_combined.count()

# Display the result
print(f"The number of rows in the final combined DataFrame is: {num_rows}")


The number of rows in the final combined DataFrame is: 280


In [0]:
# Display all rows in the final_combined DataFrame
final_combined.select("food", "Caloric Value", "Protein").show(truncate=False, n=num_rows)


+------------------------------------------------+-------------+-------+
|food                                            |Caloric Value|Protein|
+------------------------------------------------+-------------+-------+
|mozzarella cheese fat free                      |159.0        |35.8   |
|chicken spread                                  |88.0         |10.1   |
|beef empanada                                   |298.0        |10.1   |
|frijoles with cheese                            |225.0        |11.4   |
|turkey and gravy                                |161.0        |14.1   |
|crab cake                                       |160.0        |11.3   |
|taco with chicken cheese lettuce                |185.0        |13.0   |
|spinach souffle                                 |230.0        |10.7   |
|chili con carne                                 |271.0        |14.7   |
|tamale navajo                                   |285.0        |11.7   |
|pupusas del cerdo                               |2

In [0]:
from pyspark.sql.functions import round

final_combined = final_combined.withColumn("Protein %", round((F.col("Protein") / (F.col("Protein") + F.col("Fat") + F.col("Carbohydrates"))) * 100, 2))
final_combined = final_combined.withColumn("Fat %", round((F.col("Fat") / (F.col("Protein") + F.col("Fat") + F.col("Carbohydrates"))) * 100, 2))
final_combined = final_combined.withColumn("Carbohydrates %", round((F.col("Carbohydrates") / (F.col("Protein") + F.col("Fat") + F.col("Carbohydrates"))) * 100, 2))
final_combined = final_combined.withColumn("Calorie Density", round(F.col("Caloric Value") / 100, 2))
final_combined = final_combined.withColumn("Health Score", round(
    (F.col("Protein") * 2) + (F.col("Dietary Fiber") * 1.5) - (F.col("Fat") * 1) - (F.col("Sugars") * 2), 2
))



In [0]:
final_combined = final_combined.withColumn(
    "Food Category", 
    F.when(F.col("Protein %") > 40, "High Protein")
     .when(F.col("Carbohydrates %") > 50, "High Carb")
     .when(F.col("Fat %") > 50, "High Fat")
     .otherwise("Balanced")
)


In [0]:
final_combined.select("food", "Protein %", "Fat %", "Carbohydrates %","Calorie Density","Health Score","Food Category", "Nutrition Density" ).show(20, truncate=False)

+----------------------------------------+---------+-----+---------------+---------------+------------+-------------+-----------------+
|food                                    |Protein %|Fat %|Carbohydrates %|Calorie Density|Health Score|Food Category|Nutrition Density|
+----------------------------------------+---------+-----+---------------+---------------+------------+-------------+-----------------+
|mozzarella cheese fat free              |89.95    |0.0  |10.05          |1.59           |71.2        |High Protein |1128.2           |
|chicken spread                          |45.5     |44.14|10.36          |0.88           |10.1        |High Protein |31.972           |
|beef empanada                           |18.63    |30.07|51.29          |2.98           |3.4         |High Carb    |76.2             |
|frijoles with cheese                    |23.8     |16.28|59.92          |2.25           |15.0        |High Carb    |240.305          |
|turkey and gravy                        |44.76 

In [0]:
final_combined.coalesce(1).write.mode("overwrite").option("header", 'true').csv("/mnt/nutritiondata/transformed-data/")


In [0]:
# Display the list of raw data files in the specified folder to review the available datasets
display(dbutils.fs.ls("/mnt/nutritiondata/transformed-data/"))

path,name,size,modificationTime
dbfs:/mnt/nutritiondata/transformed-data/_SUCCESS,_SUCCESS,0,1741067976000
dbfs:/mnt/nutritiondata/transformed-data/_committed_3956674624863665394,_committed_3956674624863665394,113,1741067975000
dbfs:/mnt/nutritiondata/transformed-data/_started_3956674624863665394,_started_3956674624863665394,0,1741067975000
dbfs:/mnt/nutritiondata/transformed-data/part-00000-tid-3956674624863665394-99e42b2f-375f-493b-8e5a-d4c49b1c60e9-103-1-c000.csv,part-00000-tid-3956674624863665394-99e42b2f-375f-493b-8e5a-d4c49b1c60e9-103-1-c000.csv,61703,1741067975000


In [0]:
# Rename the file in DBFS
dbutils.fs.mv("/mnt/nutritiondata/transformed-data/part-00000-tid-3956674624863665394-99e42b2f-375f-493b-8e5a-d4c49b1c60e9-103-1-c000.csv",
               "/mnt/nutritiondata/transformed-data/my_transformed_data.csv")


Out[68]: True

In [0]:
# Round the columns as required
final_combined_rounded = final_combined.withColumn("Caloric Value", F.round("Caloric Value", 2)) \
                                       .withColumn("Protein", F.round("Protein", 2)) \
                                       .withColumn("Fat", F.round("Fat", 2))

# Save the rounded DataFrame as my_transformed_data.csv
final_combined_rounded.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/nutritiondata/transformed-data/my_transformed_data.csv")
