<a href="https://colab.research.google.com/github/Shantanu-Nagwekar-01/Food-Health-Classifier-Project/blob/main/Food_Health_Classifier_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

print("✅ Google Drive mounted.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Google Drive mounted.


In [None]:
# ============================
# Colab-ready robust pipeline
# - safe reading of OpenFoodFacts parquet (nutriments nested)
# - converts to numeric, imputes, encodes labels
# - trains Dask XGBoost (cpu 'hist' by default; switches to gpu if available)
# - saves final booster to Drive
# ============================

# Install libs (only needed in Colab)
!pip install -q "dask[complete]" dask-ml xgboost==1.7.6 pyarrow joblib

import os, json, time, warnings
warnings.filterwarnings("ignore")

import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask.distributed import Client
from dask_ml.impute import SimpleImputer
from dask_ml.model_selection import train_test_split
from xgboost.dask import DaskXGBClassifier
from dask_ml.preprocessing import OrdinalEncoder
import joblib

# -------------------------
# CONFIG - change these
# -------------------------
PARQUET_PATH = '/content/drive/MyDrive/Datasets/food.parquet'  # update if needed
SAVE_MODEL_PATH = '/content/drive/MyDrive/food_xgb_model.json'
# If you want to keep memory lower, reduce MAX_PARTITIONS_USED
MAX_PARTITIONS_USED = 60   # set lower to reduce memory pressure
# -------------------------

# Mount Drive in Colab (uncomment when in Colab)
# from google.colab import drive
# drive.mount('/content/drive')

# Start Dask client (small cluster tuned for Colab)
client = Client(n_workers=2, threads_per_worker=2, memory_limit='6GB')
print("✅ Dask client:", client)

# Columns to load: we load the nested 'nutriments' and a few meta columns
cols_to_load = ['nutriments', 'nutriscore_grade', 'nova_group', 'ecoscore_score',
                'additives_n', 'ingredients_from_palm_oil_n']

print("📦 Reading parquet metadata (lazy)...")
ddf = dd.read_parquet(PARQUET_PATH, columns=cols_to_load)
print("Initial partitions:", ddf.npartitions, "rows (est):", getattr(ddf, 'shape', 'unknown'))

# Heuristic to reduce partitions if too many (helps memory)
if ddf.npartitions > MAX_PARTITIONS_USED:
    print(f"⚠️ Reducing to first {MAX_PARTITIONS_USED} partitions to limit memory usage.")
    ddf = ddf.partitions[:MAX_PARTITIONS_USED]

# Define nutrient keys we want to extract from the nested dict
nutrient_keys = [
    'energy_100g', 'fat_100g', 'saturated-fat_100g',
    'carbohydrates_100g', 'sugars_100g', 'fiber_100g',
    'proteins_100g', 'salt_100g', 'sodium_100g', 'cholesterol_100g'
]

# helper unpacker (works on Python dict or JSON string)
def _unpack_dict(x, keys=nutrient_keys):
    d = {}
    try:
        if x is None:
            return {k: None for k in keys}
        if isinstance(x, str):
            # many entries are serialized dicts / JSON strings
            try:
                x = json.loads(x)
            except Exception:
                # some items may be plain strings - return missings
                return {k: None for k in keys}
        # x is now expected to be dict-like
        for k in keys:
            # sometimes key exists with nested units, handle numeric-like values
            val = x.get(k, None)
            d[k] = val
        return d
    except Exception:
        return {k: None for k in keys}

# Map partitions to DataFrame of nutrient columns
print("🔧 Expanding 'nutriments' into separate columns (lazy mapping)...")
meta = {k: 'f8' for k in nutrient_keys}   # metadata hints
# map_partitions: apply unpack to series and convert results to dataframe
nutrients_ddf = ddf['nutriments'].map_partitions(
    lambda s: s.apply(lambda r: pd.Series(_unpack_dict(r))),
    meta=meta
)

# join exploded nutrient columns back to main ddf (lazy)
ddf = dd.concat([ddf.drop(columns='nutriments'), nutrients_ddf], axis=1)

# Convert nutrient columns to numeric safely (coerce errors -> NaN)
print("🔁 Converting nutrient columns to numeric (coerce errors -> NaN)...")
for col in nutrient_keys + ['ecoscore_score', 'additives_n', 'ingredients_from_palm_oil_n']:
    if col in ddf.columns:
        ddf[col] = ddf[col].map_partitions(lambda s: pd.to_numeric(s, errors='coerce'), meta=('x', 'f8'))

# Show a small sample to check
print("Sample rows (computed head):")
print(ddf.head(3))

# Keep only rows with valid nutriscore labels a-e
valid_labels = ['a', 'b', 'c', 'd', 'e']
ddf = ddf[ddf['nutriscore_grade'].isin(valid_labels)]

# Now drop rows missing all nutrient columns (we'll impute later; drop rows missing most)
required_min_nonnull = int(len(nutrient_keys) * 0.6)  # require at least 60% nutrients present
print(f"Applying filter: keep rows with at least {required_min_nonnull} nutrients present.")
ddf = ddf.dropna(subset=nutrient_keys, how='all')  # remove rows with all nutrients missing

# OPTIONAL: if dataset is still huge, sample fraction to speed up initial runs
# df_size = ddf.shape[0].compute()  # expensive, avoid unless necessary
# ddf = ddf.sample(frac=0.5, random_state=42)   # uncomment to use sample

# Prepare features & labels
features = nutrient_keys + ['ecoscore_score', 'additives_n', 'ingredients_from_palm_oil_n']
# Ensure features present in ddf (filter)
features = [f for f in features if f in ddf.columns]
print("Final features used:", features)

# Encode label: simpler mapping a->0, b->1, ...
label_map = {'a':0, 'b':1, 'c':2, 'd':3, 'e':4}
ddf['label'] = ddf['nutriscore_grade'].map(label_map)

# Impute missing numeric values using Dask-ML SimpleImputer (mean)
print("🛠 Imputing missing numeric values (means)...")
imputer = SimpleImputer(strategy='mean')
X_ddf = ddf[features]
X_imputed = imputer.fit_transform(X_ddf)

y_ddf = ddf['label'].astype('int64')

# Train/test split (shuffle=True recommended)
print("✂️ Splitting into train/test...")
X_train, X_test, y_train, y_test = train_test_split(X_imputed, y_ddf, test_size=0.2, random_state=42, shuffle=True)

print("✅ Data ready. Starting training...")

# Choose XGBoost config: try GPU if available, else CPU hist
try:
    dask_model = DaskXGBClassifier(tree_method='gpu_hist', n_estimators=150, max_depth=6, learning_rate=0.1, random_state=42)
    print("Using GPU (gpu_hist).")
except Exception:
    dask_model = DaskXGBClassifier(tree_method='hist', n_estimators=150, max_depth=6, learning_rate=0.1, random_state=42)
    print("Using CPU hist fallback.")

# Fit model
start = time.time()
dask_model.fit(X_train, y_train)
fit_time = time.time() - start
print(f"✅ Training finished in {fit_time:.1f}s")

# Score
print("🧪 Evaluating on test set (lazy compute)...")
acc = dask_model.score(X_test, y_test)
print("Raw Dask score object:", acc)
print(f"🎯 Test accuracy: {float(acc.compute())*100:.2f}%")

# Save model booster to disk (XGBoost Booster)
try:
    booster = dask_model.get_booster()
    booster.save_model(SAVE_MODEL_PATH)
    print(f"💾 Booster saved to: {SAVE_MODEL_PATH}")
except Exception as e:
    print("⚠️ Could not save booster via get_booster(); attempting joblib.dump fallback:", e)
    joblib.dump(dask_model, SAVE_MODEL_PATH + ".joblib")
    print("Saved Dask model object via joblib (fallback).")

# Optional: cleanup
client.close()
print("🔚 Done. Dask client closed.")


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.3/200.3 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:40629
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42869'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39315'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:34903 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34903
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:40182
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:34111 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34111
INFO:distributed.core:Starting established connection to tcp://127

✅ Dask client: <Client: 'tcp://127.0.0.1:40629' processes=2 threads=4, memory=11.18 GiB>
📦 Reading parquet metadata (lazy)...
Initial partitions: 51 rows (est): (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(96ee21b).size() // 6, dtype=int64>, 6)
🔧 Expanding 'nutriments' into separate columns (lazy mapping)...
🔁 Converting nutrient columns to numeric (coerce errors -> NaN)...
Sample rows (computed head):
  nutriscore_grade  nova_group  ecoscore_score  additives_n  \
0                e         NaN             NaN          NaN   
1          unknown         1.0             NaN          0.0   
2          unknown         1.0            80.0          0.0   

   ingredients_from_palm_oil_n  energy_100g  fat_100g  saturated-fat_100g  \
0                          NaN          NaN       NaN                 NaN   
1                          0.0          NaN       NaN                 NaN   
2                          0.0          NaN       NaN                 NaN   

   carbohydrates_100g  sugars

INFO:distributed.core:Connection to tcp://127.0.0.1:40184 has been closed.
INFO:distributed.scheduler:Remove worker addr: tcp://127.0.0.1:34111 name: 1 (stimulus_id='handle-worker-cleanup-1761935696.9230452')
INFO:distributed.nanny:Worker process 10358 was killed by signal 15
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:46087 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46087
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:36552
INFO:distributed.core:Connection to tcp://127.0.0.1:40182 has been closed.
INFO:distributed.scheduler:Remove worker addr: tcp://127.0.0.1:34903 name: 0 (stimulus_id='handle-worker-cleanup-1761935755.3602762')
INFO:distributed.nanny:Worker process 10355 was killed by signal 15
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:39347 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39347
INFO:distributed.core:Starting established conn

KilledWorker: Attempted to run task ('read_parquet-fused-getitem-f525f1e9d6a55b20c5c4f8f4c5747613', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:34613. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile('/content/drive/MyDrive/Datasets/food.parquet')

for batch in parquet_file.iter_batches():
    print("RecordBatch")
    batch_df = batch.to_pandas()
    print("batch_df:", batch_df)

RecordBatch
batch_df:        additives_n additives_tags allergens_tags          brands_tags  \
0              NaN           None      [en:nuts]         [xx:bovetti]   
1              0.0             []             []             [lagg-s]   
2              0.0             []             []             [lagg-s]   
3              0.0             []             []          [xx:lagg-s]   
4              0.0             []             []             [lagg-s]   
...            ...            ...            ...                  ...   
65531          0.0             []             []       [duncan-hines]   
65532          0.0             []             []  [kime-s-cider-mill]   
65533          0.0             []             []       [mama-cocco-s]   
65534          0.0             []             []               [swad]   
65535          0.0             []   [en:mustard]   [swad, raja-foods]   

                  brands                                         categories  \
0                Bovet

KeyboardInterrupt: 

In [None]:
!pip install dask[complete] dask-ml xgboost --quiet
# !pip install dask-ml "xgboost>=1.7.0"

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/150.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m150.0/150.0 kB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m85.2 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/259.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m259.4/259.4 kB[0m [31m26.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!pip install dask-ml dask-xgboost

Collecting dask-xgboost
  Downloading dask_xgboost-0.2.0-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting xgboost<=0.90 (from dask-xgboost)
  Downloading xgboost-0.90-py2.py3-none-manylinux1_x86_64.whl.metadata (3.8 kB)
Downloading dask_xgboost-0.2.0-py2.py3-none-any.whl (14 kB)
Downloading xgboost-0.90-py2.py3-none-manylinux1_x86_64.whl (142.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m142.8/142.8 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xgboost, dask-xgboost
  Attempting uninstall: xgboost
    Found existing installation: xgboost 3.1.1
    Uninstalling xgboost-3.1.1:
      Successfully uninstalled xgboost-3.1.1
Successfully installed dask-xgboost-0.2.0 xgboost-0.90


In [None]:
# ==========================================================
# 📦 SETUP
# ==========================================================
!pip install dask[complete] dask-ml xgboost --quiet

import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client
from dask_ml.preprocessing import OrdinalEncoder
from dask_ml.impute import SimpleImputer
from dask_ml.model_selection import train_test_split
from xgboost.dask import DaskXGBClassifier

# ==========================================================
# ⚙️ CONFIGURATION
# ==========================================================
drive_file_path = '/content/drive/MyDrive/Datasets/food.parquet'

cols = [
    'nutriscore_grade', 'nova_group', 'ecoscore_score',
    'additives_n', 'ingredients_from_palm_oil_n',
    'energy_100g', 'fat_100g', 'saturated-fat_100g',
    'carbohydrates_100g', 'sugars_100g', 'fiber_100g',
    'proteins_100g', 'salt_100g', 'sodium_100g', 'cholesterol_100g'
]

# ==========================================================
# 🚀 START DASK CLUSTER
# ==========================================================
client = Client(n_workers=2, threads_per_worker=2, memory_limit='6GB')
print("✅ Dask client started:", client)

# ==========================================================
# 🧩 LOAD PARQUET (LAZY)
# ==========================================================
print("📦 Reading Parquet (lazy mode)...")
ddf = dd.read_parquet(drive_file_path, columns=cols)

# ==========================================================
# 🧮 CLEAN AND CONVERT
# ==========================================================
nutrient_cols = cols[5:]  # numeric nutrient fields

# Convert all numeric columns safely
for col in nutrient_cols:
    ddf[col] = ddf[col].map_partitions(lambda s: pd.to_numeric(s, errors='coerce'))

# Drop rows missing key columns
ddf = ddf.dropna(subset=nutrient_cols + ['nutriscore_grade'])

# ==========================================================
# 🎯 TARGET ENCODING
# ==========================================================
target = 'nutriscore_grade'
features = [c for c in ddf.columns if c != target]

# Filter valid labels (A–E)
valid_labels = ['a', 'b', 'c', 'd', 'e']
ddf = ddf[ddf[target].isin(valid_labels)]

encoder = OrdinalEncoder()
y_ddf_encoded = encoder.fit_transform(ddf[[target]])
X_ddf = ddf[features]

# ==========================================================
# 🔧 IMPUTE + SPLIT
# ==========================================================
imputer = SimpleImputer(strategy='mean')
X_ddf_imputed = imputer.fit_transform(X_ddf)

X_train, X_test, y_train, y_test = train_test_split(
    X_ddf_imputed, y_ddf_encoded, test_size=0.2, random_state=42, shuffle=True
)

print("✅ Data ready for training")

# ==========================================================
# 🌲 TRAIN XGBOOST MODEL
# ==========================================================
print("🚀 Training model (gpu_hist if available)...")

dask_model = DaskXGBClassifier(
    tree_method='hist',  # change to 'gpu_hist' if GPU available
    n_estimators=200,
    max_depth=6,
    learning_rate=0.1,
    random_state=42
)

dask_model.fit(X_train, y_train)
print("✅ Model training complete!")

# ==========================================================
# 📊 EVALUATE
# ==========================================================
accuracy = dask_model.score(X_test, y_test)
print(f"🎯 Model Accuracy: {accuracy.compute()*100:.2f}%")

# ==========================================================
# 💾 SAVE MODEL
# ==========================================================
import joblib
joblib.dump(dask_model.get_booster(), '/content/food_health_xgb_model.bin')
print("✅ Model saved as food_health_xgb_model.bin")


INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:40899
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42247'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43403'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:34213 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34213
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:58104
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:38357 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38357
INFO:distributed.core:Starting established connection to tcp://127

✅ Dask client started: <Client: 'tcp://127.0.0.1:40899' processes=2 threads=4, memory=11.18 GiB>
📦 Reading Parquet (lazy mode)...


KeyError: 'An error occurred while calling the read_parquet method registered to the pandas backend.\nOriginal Message: "[\'energy_100g\', \'fat_100g\', \'saturated-fat_100g\', \'carbohydrates_100g\', \'sugars_100g\', \'fiber_100g\', \'proteins_100g\', \'salt_100g\', \'sodium_100g\', \'cholesterol_100g\'] not in index"'

In [None]:
from dask_ml.preprocessing import OrdinalEncoder
from dask_ml.model_selection import train_test_split
from xgboost.dask import DaskXGBClassifier
from dask.distributed import Client

client = Client()
print(client)

X = ddf_clean[nutri_keys + ['nova_group', 'ecoscore_score']]
y = ddf_clean[['nutriscore_grade']]

encoder = OrdinalEncoder()
y_encoded = encoder.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)

model = DaskXGBClassifier(tree_method='gpu_hist', random_state=42)
model.fit(X_train, y_train)

acc = model.score(X_test, y_test)
print(f"✅ Model trained successfully with accuracy: {acc.compute() * 100:.2f}%")


INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:37483
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42197'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40285'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:35981 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35981
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:38236
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42995 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42995
INFO:distributed.core:Starting established connection to tcp://127

<Client: 'tcp://127.0.0.1:37483' processes=2 threads=2, memory=12.67 GiB>


INFO:distributed.core:Connection to tcp://127.0.0.1:38236 has been closed.
INFO:distributed.scheduler:Remove worker addr: tcp://127.0.0.1:35981 name: 0 (stimulus_id='handle-worker-cleanup-1761934161.2605052')
INFO:distributed.nanny:Worker process 3880 was killed by signal 15
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:39369 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39369
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:50110
INFO:distributed.worker:Run out-of-band function '_start_tracker'


ValueError: DataFrame.dtypes for data must be int, float, bool or category. When categorical type is supplied, the experimental DMatrix parameter`enable_categorical` must be set to `True`.  Invalid columns:energy_100g: object, fat_100g: object, saturated-fat_100g: object, carbohydrates_100g: object, sugars_100g: object, fiber_100g: object, proteins_100g: object, salt_100g: object, sodium_100g: object, cholesterol_100g: object

In [None]:
# import dask.dataframe as dd

# file_path = '/content/drive/MyDrive/Datasets/food.parquet'
# ddf = dd.read_parquet(file_path)

# print("Number of columns:", len(ddf.columns))
# print("Sample columns:")
# print(ddf.columns[50:110])  # show first 50 column names


Number of columns: 110
Sample columns:
Index(['lang', 'languages_tags', 'last_edit_dates_tags', 'last_editor',
       'last_image_t', 'last_modified_by', 'last_modified_t', 'last_updated_t',
       'link', 'main_countries_tags', 'manufacturing_places_tags',
       'manufacturing_places', 'max_imgid', 'minerals_tags', 'misc_tags',
       'new_additives_n', 'no_nutrition_data', 'nova_group',
       'nova_groups_tags', 'nova_groups', 'nucleotides_tags',
       'nutrient_levels_tags', 'nutriments', 'nutriscore_grade',
       'nutriscore_score', 'nutrition_data_per', 'obsolete', 'origins_tags',
       'origins', 'owner_fields', 'owner', 'packagings_complete',
       'packaging_recycling_tags', 'packaging_shapes_tags', 'packaging_tags',
       'packaging_text', 'packaging', 'packagings', 'photographers',
       'popularity_key', 'popularity_tags', 'product_name',
       'product_quantity_unit', 'product_quantity', 'purchase_places_tags',
       'quantity', 'rev', 'scans_n', 'serving_quantity