In [None]:
import csv

with open("task1.csv", mode='w') as file:
    writer = csv.DictWriter(file, fieldnames=["latitude", "longitude", "date", "l1_category", "l1_category_id", "l2_category", "l2_category_id", "store_id", "variant_id", "variant_name", "group_id", "selling_price", "mrp", "in_stock", "inventory", "is_sponsored", "image_url", "brand_id", "brand"])
    writer.writeheader()

In [None]:
def write_data_to_csv(root_level, l1, l2, l1_cat_name, l2_cat_name, lat, long, date):
    total_products = 0
    with open("task1.csv", mode='a', encoding='utf-8') as file:
        writer = csv.DictWriter(file, fieldnames=["latitude", "longitude", "date", "l1_category", "l1_category_id", "l2_category", "l2_category_id", "store_id", "variant_id", "variant_name", "group_id", "selling_price", "mrp", "in_stock", "inventory", "is_sponsored", "image_url", "brand_id", "brand"])
        for product in root_level:
            if product["data"].get("atc_action"):
                is_sponsored = product["tracking"]["common_attributes"].get("badge") == "AD"
                pdata = product["data"]

                group_id = pdata["group_id"]
                store_id = pdata["merchant_id"]
                if pdata.get("brand_name"):
                    brand = pdata["brand_name"]["text"]
                else:
                    # print(pdata)
                    break
                brand_id = "N/A"

                # l1_category = cat_map[l0]
                # l1_category_id = l0
                # l2_category = cat_map[l1]
                # l2_category_id = l1

                var_set = set()
                var_set.add(pdata["identity"]["id"])

                # Primary variant
                variants = [
                    {
                        'id': pdata["identity"]["id"],
                        'name': pdata["name"]["text"],
                        'selling_price': pdata["normal_price"]["text"],
                        'mrp': pdata["mrp"]["text"] if pdata.get("mrp") else pdata["normal_price"]["text"],
                        'inventory': pdata["inventory"],
                        'in_stock': True if int(pdata["inventory"]) > 0 else False,
                        'img_url': pdata["image"]["url"]
                    }
                ]

                # Additional variants
                if pdata.get("variants_list"):
                    for var in pdata["variants_list"]:
                        v = var["data"]
                        if v["identity"]["id"] not in var_set:
                            var_set.add(v["identity"]["id"])
                            variants.append({
                                'id': v["identity"]["id"],
                                'name': v["name"]["text"] + " " + v["variant"]["text"],
                                'selling_price': v["normal_price"]["text"],
                                'mrp': v["mrp"]["text"] if v.get("mrp") else v["normal_price"]["text"],
                                'inventory': v["inventory"],
                                'in_stock': True if int(v["inventory"]) > 0 else False,
                                'img_url': v["image"]["url"]
                            })
        
                for var in variants:
                    writer.writerow({
                        "latitude": lat,
                        "longitude": long,
                        "date": date,
                        "l1_category": l1_cat_name,
                        "l1_category_id": l1,
                        "l2_category": l2_cat_name,
                        "l2_category_id": l2,
                        "store_id": store_id,
                        "variant_id": var["id"],
                        "variant_name": var["name"],
                        "group_id": group_id,
                        "selling_price": var["selling_price"],
                        "mrp": var["mrp"],
                        "in_stock": var["in_stock"],
                        "inventory": var["inventory"],
                        "is_sponsored": is_sponsored,
                        "image_url": var["img_url"],
                        "brand_id": brand_id,
                        "brand": brand
                    })
                
                total_products += len(variants)
    
    return total_products


In [None]:
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
import time
import json
import csv
from datetime import datetime

date = datetime.now().strftime("%d/%m/%Y")

# lat = "12.9157604"
# lon = "77.6917672"
# cat_map = {
#     1237: "Munchies",
#     9: "Sweet Tooth",
#     1178: "Bhujia & Mixtures",
#     1694: "Munchies Gift Pack",
#     29: "Namkeen Snacks",
#     80: "Papad & Fryums",
#     940: "Chips & Crisps",
#     943: "Indian Sweets"
# }
# l0_l1_pair = [(1237, 1178), (1237, 1694), (1237, 29), (1237, 80), (1237, 940), (9, 943)]
# data = []

def fetch_page(driver, offset, page_index, l0, l1, lat, lon):
    url = (
        f"https://blinkit.com/v1/layout/listing_widgets?"
        f"offset={offset}&limit=15&exclude_combos=false&l0_cat={l0}&l1_cat={l1}"
        f"&last_snippet_type=product_card_snippet_type_2"
        f"&last_widget_type=product_container"
        f"&oos_visibility=true&page_index={page_index}"
        f"&total_entities_processed={page_index}&total_pagination_items=120"
    ) if offset != 0 else (f"https://blinkit.com/v1/layout/listing_widgets?l0_cat={l0}&l1_cat={l1}")

    print(url)

    script = f"""
    fetch("{url}", {{
    method: "POST",
    headers: {{
        "Lat": "{lat}",
        "Lon": "{lon}"
    }}
    }})
    .then(res => res.json())
    .then(data => {{
    document.body.innerText = JSON.stringify(data);
    }})
    .catch(err => {{
    document.body.innerText = "ERROR: " + err;
    }});
    """

    driver.get("data:text/html,<html><body></body></html>")
    driver.execute_script(script)
    time.sleep(2)
    # lets_quit = True
    while True:
        body = driver.find_element(By.TAG_NAME, "body").text
        if body:
            break
    # print("BODY DUMP @ offset=0:", body[:300])
    try:
        return json.loads(body)
        # return data
    except:
        print(f"❌ Failed to parse page at offset={offset}")
        return None

options = uc.ChromeOptions()
options.headless = False
driver = uc.Chrome(options=options)

l0_l1_pair = []
lat_lon_pair = []
category_map = {}

with open('blinkit_categories.csv', mode='r', encoding='utf-8') as file:
    reader = csv.DictReader(file)
    for row in reader:
        l0 = row['l1_category_id']
        l1 = row['l2_category_id']
        l0_l1_pair.append((l0, l1))
        category_map.update({l0 : row['l1_category'], l1 : row["l2_category"]})

with open('blinkit_locations.csv', mode='r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        lat_lon_pair.append((
            row["latitude"], row["longitude"]
        ))

for lat, lon in lat_lon_pair:
    for l0, l1 in l0_l1_pair:
        offset = 0
        page_index = 0
        limit = 15
        max_pages = 20
        while True:
            print(f"📦 Fetching page {page_index + 1}...")
            local_data = fetch_page(driver, offset, page_index, l0=l0, l1=l1, lat=lat, lon=lon)
            if not local_data:
                break


            product_count = write_data_to_csv(
                root_level=local_data["response"]["snippets"] if local_data["response"].get("snippets") else [],
                l1=l0, 
                l2=l1, 
                l1_cat_name=category_map[l0],
                l2_cat_name=category_map[l1],
                lat=lat,
                long=lon,
                date=date
            )
            
            all_products += product_count

            if product_count < limit:
                break

            offset += limit
            page_index += 1
            if page_index >= max_pages:
                break

driver.quit()

print(f"✅ Done! Total products: {all_products}")



In [None]:
with open("task1.csv", mode='a', encoding='utf-8') as file:
    writer = csv.DictWriter(file, fieldnames=["date", "l1_category", "l1_category_id", "l2_category", "l2_category_id", "store_id", "variant_id", "variant_name", "group_id", "selling_price", "mrp", "in_stock", "inventory", "is_sponsored", "image_url", "brand_id", "brand"])
    for page_data in data:
        l0 = page_data['l0']
        l1 = page_data['l1']

        root_level = page_data['data']
        for product in root_level:
            if product["data"].get("atc_action"):
                is_sponsored = product["tracking"]["common_attributes"].get("badge") == "AD"
                pdata = product["data"]

                group_id = pdata["group_id"]
                store_id = pdata["merchant_id"]
                if pdata.get("brand_name"):
                    brand = pdata["brand_name"]["text"]
                else:
                    # print(pdata)
                    break
                brand_id = "N/A"

                l1_category = cat_map[l0]
                l1_category_id = l0
                l2_category = cat_map[l1]
                l2_category_id = l1

                var_set = set()
                var_set.add(pdata["identity"]["id"])

                # Primary variant
                variants = [
                    {
                        'id': pdata["identity"]["id"],
                        'name': pdata["name"]["text"],
                        'selling_price': pdata["normal_price"]["text"],
                        'mrp': pdata["mrp"]["text"] if pdata.get("mrp") else pdata["normal_price"]["text"],
                        'inventory': pdata["inventory"],
                        'in_stock': True if int(pdata["inventory"]) > 0 else False,
                        'img_url': pdata["image"]["url"]
                    }
                ]

                # Additional variants
                if pdata.get("variants_list"):
                    for var in pdata["variants_list"]:
                        v = var["data"]
                        if v["identity"]["id"] not in var_set:
                            var_set.add(v["identity"]["id"])
                            variants.append({
                                'id': v["identity"]["id"],
                                'name': v["name"]["text"] + " " + v["variant"]["text"],
                                'selling_price': v["normal_price"]["text"],
                                'mrp': v["mrp"]["text"] if v.get("mrp") else v["normal_price"]["text"],
                                'inventory': v["inventory"],
                                'in_stock': True if int(v["inventory"]) > 0 else False,
                                'img_url': v["image"]["url"]
                            })
        
                for var in variants:
                    writer.writerow({
                        "date": date,
                        "l1_category": l1_category,
                        "l1_category_id": l1_category_id,
                        "l2_category": l2_category,
                        "l2_category_id": l2_category_id,
                        "store_id": store_id,
                        "variant_id": var["id"],
                        "variant_name": var["name"],
                        "group_id": group_id,
                        "selling_price": var["selling_price"],
                        "mrp": var["mrp"],
                        "in_stock": var["in_stock"],
                        "inventory": var["inventory"],
                        "is_sponsored": is_sponsored,
                        "image_url": var["img_url"],
                        "brand_id": brand_id,
                        "brand": brand
                    })
