In [1]:
import csv
import json

JSONL for single collection schema

In [2]:
def create_country_code_map(country_file_path):
    """
    Reads the Country-Code.csv file and returns a dictionary
    mapping country codes (as integers) to country names.
    """
    country_map = {}
    try:
        with open(country_file_path, mode='r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                try:
                    # Assuming column names are 'Country Code' and 'Country'
                    country_code = int(row['country_code'])
                    country_map[country_code] = row['country']
                except ValueError:
                    print(f"Warning: Could not parse country code in {country_file_path}: {row['country_code']}")
                except KeyError:
                    print(f"Warning: 'Country Code' or 'Country' column missing in {country_file_path}.")
                    return None # Indicate critical error
    except FileNotFoundError:
        print(f"Error: Country code file not found at {country_file_path}")
        return None
    return country_map

In [3]:
def transform_data(data_file_path, country_map, output_file_path):
    """
    Reads new_dataset.csv, transforms each row into the desired
    MongoDB document structure, and writes it to an output JSON Lines file.
    """
    if country_map is None:
        print("Error: Country map is not available. Aborting transformation.")
        return

    transformed_count = 0
    try:
        with open(data_file_path, mode='r', encoding='utf-8') as infile, \
             open(output_file_path, mode='w', encoding='utf-8') as outfile:
            
            reader = csv.DictReader(infile)
            
            # Verify expected columns (adjust based on your actual new_dataset.csv headers)
            expected_headers = [
                'restaurant_id', 'restaurant_name', 'country_code', 'city', 'address',
                'locality', 'locality_verbose', 'longitude', 'latitude', 'cuisines',
                'average_cost_for_two', 'currency', 'has_table_booking', 'has_online_delivery',
                'is_delivering_now', 'switch_to_order_menu', 'price_range', 'aggregate_rating',
                'rating_color', 'rating_text', 'votes'
            ]
            missing_headers = [h for h in expected_headers if h not in reader.fieldnames]
            if missing_headers:
                print(f"Warning: The following expected columns are missing from {data_file_path}: {', '.join(missing_headers)}")
                print("Please ensure your CSV column names match. Script will attempt to proceed but may fail or produce incomplete documents.")

            for row in reader:
                try:
                    country_code_val = int(row.get('country_code', 0)) # Default to 0 if missing, handle error below
                    
                    # Handle cuisines: split by comma, strip whitespace
                    cuisines_str = row.get('cuisines', '')
                    cuisines_list = [c.strip() for c in cuisines_str.split(',') if c.strip()] if cuisines_str else []

                    mongo_doc = {
                        # Using original 'Restaurant ID' as a field, MongoDB will generate its own _id
                        "restaurant_id": int(row.get('restaurant_id')),
                        "restaurant_name": row.get('restaurant_name', ''),
                        "address": {
                            "street": row.get('address', ''),
                            "city": row.get('city', ''),
                            "locality": row.get('locality', ''),
                            "locality_verbose": row.get('locality_verbose', ''),
                            "country_code": country_code_val,
                            "country_name": country_map.get(country_code_val, "Unknown Country") # Get country name from map
                        },
                        "location": {
                            "type": "Point",
                            "coordinates": [
                                float(row.get('longitude', 0.0)),
                                float(row.get('latitude', 0.0))
                            ]
                        },
                        "cuisines": cuisines_list,
                        "average_cost_for_two": int(row.get('average_cost_for_two', 0)),
                        "rating_details": {
                            "aggregate_rating": float(row.get('aggregate_rating', 0.0)),
                            "rating_color": row.get('rating_color', ''),
                            "rating_text": row.get('rating_text', ''),
                            "votes": int(row.get('votes', 0)),
                            "price_range": int(row.get('price_range', 0)),
                            "currency": row.get('currency', '')
                        }
                        # The fields 'Has Table booking' and 'Has Online delivery' were noted
                        # as dropped in your Dataset_EDA.ipynb. If they exist in your
                        # new_dataset.csv and you want to include them, add them here.
                        # Example:
                        # "has_table_booking": True if row.get('Has Table booking', 'No').lower() == 'yes' else False,
                        # "has_online_delivery": True if row.get('Has Online delivery', 'No').lower() == 'yes' else False,
                    }
                    
                    # Write the JSON document to the output file, one per line
                    outfile.write(json.dumps(mongo_doc) + '\n')
                    transformed_count += 1
                except ValueError as ve:
                    print(f"Warning: Skipping row due to data conversion error (ValueError): {ve} - Row: {row}")
                except KeyError as ke:
                    print(f"Warning: Skipping row due to missing key (KeyError): {ke} - Row: {row}")
                except Exception as e:
                    print(f"Warning: Skipping row due to unexpected error: {e} - Row: {row}")
            
            print(f"Successfully transformed {transformed_count} documents.")

    except FileNotFoundError:
        print(f"Error: Data file not found at {data_file_path}")
    except Exception as e:
        print(f"An unexpected error occurred during transformation: {e}")


In [4]:
country_csv_file = 'Country-Code.csv' # Ensure this file is in the same directory or provide full path
data_csv_file = 'new_dataset.csv'     # Ensure this file is in the same directory or provide full path
output_jsonl_file = 'restaurants_mongo.jsonl'

print("Starting data migration process...")

country_mapping = create_country_code_map(country_csv_file)

if country_mapping:
    print(f"Country code map created with {len(country_mapping)} entries.")
    transform_data(data_csv_file, country_mapping, output_jsonl_file)
    print(f"Transformation complete. Output written to {output_jsonl_file}")
    print(f"You can now import '{output_jsonl_file}' into MongoDB.")
else:
    print("Could not create country mapping. Please check 'Country-Code.csv'.")

Starting data migration process...
Country code map created with 15 entries.


Successfully transformed 9551 documents.
Transformation complete. Output written to restaurants_mongo.jsonl
You can now import 'restaurants_mongo.jsonl' into MongoDB.


JSONL for double collection schema

In [2]:
import csv
import json
from datetime import datetime, timezone

def create_country_code_map(country_file_path):
    """
    Reads the Country-Code.csv file and returns a dictionary
    mapping country codes (as integers) to country names.
    """
    country_map = {}
    try:
        with open(country_file_path, mode='r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            # Assuming 'Country-Code.csv' headers are 'Country Code' and 'Country'
            # Adjust if your Country-Code.csv has different headers (e.g., lowercase with underscore)
            csv_country_code_header = 'country_code' # Standard Zomato CSV header for this file
            csv_country_name_header = 'country'      # Standard Zomato CSV header for this file

            if csv_country_code_header not in reader.fieldnames or csv_country_name_header not in reader.fieldnames:
                print(f"Error: Expected columns '{csv_country_code_header}' or '{csv_country_name_header}' missing in {country_file_path}.")
                print(f"Actual headers: {reader.fieldnames}")
                return None
            for row in reader:
                try:
                    country_code = int(row[csv_country_code_header])
                    country_map[country_code] = row[csv_country_name_header]
                except ValueError:
                    print(f"Warning: Could not parse country code in {country_file_path} for row: {row}")
    except FileNotFoundError:
        print(f"Error: Country code file not found at {country_file_path}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred while reading {country_file_path}: {e}")
        return None
    return country_map

def is_valid_longitude(lon_str):
    try:
        lon = float(lon_str)
        return -180 <= lon <= 180
    except (ValueError, TypeError):
        return False

def is_valid_latitude(lat_str):
    try:
        lat = float(lat_str)
        return -90 <= lat <= 90
    except (ValueError, TypeError):
        return False

def transform_data_to_two_collections(data_file_path, country_map, output_main_file, output_ratings_file):
    """
    Reads new_dataset.csv, transforms each row into two document structures
    (for restaurants_main and restaurant_live_ratings),
    and writes them to their respective output JSON Lines files.
    Uses the provided csv_headers mapping.
    """
    if country_map is None:
        print("Error: Country map is not available. Aborting transformation.")
        return

    count_main = 0
    count_ratings = 0
    
    # Corrected csv_headers based on your input
    csv_headers = {
        'id': 'restaurant_id',
        'name': 'restaurant_name',
        'country_code_csv': 'country_code', # CSV header for country code in new_dataset.csv
        'city': 'city', # Corrected: Assuming 'city' not 'City' based on other keys
        'address_street_csv': 'address',
        'locality': 'locality',
        'locality_verbose': 'locality_verbose',
        'longitude_csv': 'longitude',
        'latitude_csv': 'latitude',
        'cuisines_csv': 'cuisines',
        'avg_cost': 'average_cost_for_two',
        'currency_csv': 'currency',
        'price_range_csv': 'price_range',
        'agg_rating_csv': 'aggregate_rating',
        'rating_color_csv': 'rating_color',
        'rating_text_csv': 'rating_text',
        'votes_csv': 'votes' # Note: your example had 'votes_csv': 'votes', I kept 'votes_csv' as the key
                               # for consistency, its value is 'votes' which is the CSV header.
        # Optional fields (if they exist in your new_dataset.csv)
        # 'has_table_booking_csv': 'has_table_booking',
        # 'has_online_delivery_csv': 'has_online_delivery',
    }


    try:
        with open(data_file_path, mode='r', encoding='utf-8') as infile, \
             open(output_main_file, mode='w', encoding='utf-8') as outfile_main, \
             open(output_ratings_file, mode='w', encoding='utf-8') as outfile_ratings:

            reader = csv.DictReader(infile)
            
            # Verify that all keys used from csv_headers actually exist in the reader's fieldnames
            missing_csv_headers_in_file = [header_name for key, header_name in csv_headers.items() if header_name not in reader.fieldnames]
            if missing_csv_headers_in_file:
                print(f"Critical Warning: The following CSV column names specified in 'csv_headers' were NOT FOUND in '{data_file_path}': {', '.join(missing_csv_headers_in_file)}")
                print(f"Actual headers found in CSV: {reader.fieldnames}")
                print("Please correct the 'csv_headers' dictionary in the script to match your CSV file EXACTLY or ensure your CSV file has these columns.")
                # Decide if you want to abort or proceed with potential errors
                # return # Uncomment to abort if headers are missing

            for row_num, row in enumerate(reader, 1):
                try:
                    restaurant_id_val = int(row.get(csv_headers['id']))
                    
                    lon_str = row.get(csv_headers['longitude_csv'])
                    lat_str = row.get(csv_headers['latitude_csv'])
                    
                    coordinates_val = [400, 400] 
                    if is_valid_longitude(lon_str) and is_valid_latitude(lat_str):
                        coordinates_val = [float(lon_str), float(lat_str)]
                    else:
                        print(f"Info: Row {row_num} (ID: {restaurant_id_val}) - Invalid coordinates ('{lon_str}', '{lat_str}'). Using marker [400, 400].")

                    cuisines_str = row.get(csv_headers['cuisines_csv'], '')
                    cuisines_list = [c.strip() for c in cuisines_str.split(',') if c.strip()] if cuisines_str else []
                    
                    country_code_val = 0
                    try:
                        # Use the mapping for country_code from csv_headers
                        country_code_from_csv_str = row.get(csv_headers['country_code_csv'], '0')
                        if country_code_from_csv_str and country_code_from_csv_str.strip():
                             country_code_val = int(float(country_code_from_csv_str)) # Handle potential float like "1.0"
                        else:
                            country_code_val = 0 # Default if empty or missing
                    except ValueError:
                        print(f"Warning: Row {row_num} (ID: {restaurant_id_val}) - Invalid Country Code '{row.get(csv_headers['country_code_csv'])}'. Using 0.")
                    country_name_val = country_map.get(country_code_val, "Unknown Country")

                    main_doc = {
                        "restaurant_id": restaurant_id_val,
                        "restaurant_name": row.get(csv_headers['name'], ''),
                        "address": {
                            "street": row.get(csv_headers['address_street_csv'], ''),
                            "city": row.get(csv_headers['city'], ''),
                            "locality": row.get(csv_headers['locality'], ''),
                            "locality_verbose": row.get(csv_headers['locality_verbose'], ''),
                            "country_code": country_code_val,
                            "country_name": country_name_val
                        },
                        "location": {
                            "type": "Point",
                            "coordinates": coordinates_val
                        },
                        "cuisines": cuisines_list,
                        "average_cost_for_two": int(row.get(csv_headers['avg_cost'], 0)) if row.get(csv_headers['avg_cost']) else None,
                        "rating_details": {
                            "rating_color": row.get(csv_headers['rating_color_csv'], ''),
                            "rating_text": row.get(csv_headers['rating_text_csv'], ''),
                            "price_range": int(row.get(csv_headers['price_range_csv'], 0)) if row.get(csv_headers['price_range_csv']) else 0,
                            "currency": row.get(csv_headers['currency_csv'], '')
                        }
                        # Optional fields:
                        # "has_table_booking": True if row.get(csv_headers.get('has_table_booking_csv'), 'No').lower() == 'yes' else False,
                        # "has_online_delivery": True if row.get(csv_headers.get('has_online_delivery_csv'), 'No').lower() == 'yes' else False,
                    }
                    # Add optional fields only if their CSV header name is defined in csv_headers
                    if 'has_table_booking_csv' in csv_headers and csv_headers['has_table_booking_csv'] in row:
                         main_doc["has_table_booking"] = True if row.get(csv_headers['has_table_booking_csv'], 'No').lower() == 'yes' else False
                    if 'has_online_delivery_csv' in csv_headers and csv_headers['has_online_delivery_csv'] in row:
                         main_doc["has_online_delivery"] = True if row.get(csv_headers['has_online_delivery_csv'], 'No').lower() == 'yes' else False
                        
                    outfile_main.write(json.dumps(main_doc) + '\n')
                    count_main += 1

                    ratings_doc = {
                        "restaurant_id": restaurant_id_val,
                        "aggregate_rating": float(row.get(csv_headers['agg_rating_csv'], 0.0)) if row.get(csv_headers['agg_rating_csv']) else 0.0,
                        "votes": int(row.get(csv_headers['votes_csv'], 0)) if row.get(csv_headers['votes_csv']) else 0,
                        "last_updated_timestamp": datetime.now(timezone.utc).isoformat()
                    }
                    outfile_ratings.write(json.dumps(ratings_doc) + '\n')
                    count_ratings += 1

                except ValueError as ve:
                    print(f"Warning: Row {row_num} (ID: {row.get(csv_headers.get('id', 'UNKNOWN_ID'))}) - Skipping due to data conversion error (ValueError): {ve} - Row data excerpt: { {k: row.get(v) for k,v in list(csv_headers.items())[:3]} }...")
                except KeyError as ke: # Should be caught by the initial header check now
                    print(f"Warning: Row {row_num} (ID: {row.get(csv_headers.get('id', 'UNKNOWN_ID'))}) - Skipping due to missing key (KeyError): {ke}. Check csv_headers dictionary and CSV file headers. Row data excerpt: { {k: row.get(v) for k,v in list(csv_headers.items())[:3]} }...")
                except Exception as e:
                    print(f"Warning: Row {row_num} (ID: {row.get(csv_headers.get('id', 'UNKNOWN_ID'))}) - Skipping due to unexpected error: {e} - Row data excerpt: { {k: row.get(v) for k,v in list(csv_headers.items())[:3]} }...")
            
            print(f"\nSuccessfully transformed {count_main} documents for 'restaurants_main'.")
            print(f"Successfully transformed {count_ratings} documents for 'restaurant_live_ratings'.")

    except FileNotFoundError:
        print(f"Error: Data file '{data_file_path}' not found.")
    except Exception as e:
        print(f"An unexpected error occurred during transformation: {e}")

# --- Main execution ---
if __name__ == "__main__":
    country_csv_file = 'Country-Code.csv' 
    data_csv_file = 'new_dataset.csv'     
    
    output_main_jsonl = 'restaurants_main.jsonl'
    output_ratings_jsonl = 'restaurant_live_ratings.jsonl'

    print("Starting data migration process for two collections...")
    
    country_mapping = create_country_code_map(country_csv_file)
    
    if country_mapping:
        print(f"Country code map created with {len(country_mapping)} entries.")
        transform_data_to_two_collections(data_csv_file, country_mapping, output_main_jsonl, output_ratings_jsonl)
        print(f"\nTransformation complete.")
        print(f"Output for main data: {output_main_jsonl}")
        print(f"Output for live ratings data: {output_ratings_jsonl}")
        print(f"\nYou can now import these JSONL files into their respective MongoDB collections.")
        print(f"Remember to use the correct database and collection names in your 'mongoimport' commands.")
    else:
        print("Could not create country mapping. Please check 'Country-Code.csv' and its headers.")

Starting data migration process for two collections...
Country code map created with 15 entries.

Successfully transformed 9551 documents for 'restaurants_main'.
Successfully transformed 9551 documents for 'restaurant_live_ratings'.

Transformation complete.
Output for main data: restaurants_main.jsonl
Output for live ratings data: restaurant_live_ratings.jsonl

You can now import these JSONL files into their respective MongoDB collections.
Remember to use the correct database and collection names in your 'mongoimport' commands.
