# Neo4j database creation and data loading script.

This script creates a Neo4j database, populates it with data from CSV files, and runs 5 queries to test the database.


# 1. Requirements

## 1.1 Imports and connection parameters

In [1]:
import csv
from neo4j import GraphDatabase, basic_auth, Driver
import sys # Used for flushing output in Jupyter
import pandas as pd

# !! IMPORTANT: Update these with your Neo4j instance details !!
NEO4J_URI = "bolt://localhost:7687"  # Or "neo4j://localhost:7687" for Aura/newer versions
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "password"  # <--- CHANGE THIS TO YOUR NEO4J PASSWORD
DB_NAME = "neo4j"  # Change if using a different database

## 1.2 Global ID Counters and File Paths


In [2]:
next_fsi_id = 1
next_product_id = 1
next_census_id = 1
next_fsr_id = 1

# !! IMPORTANT: Update these paths to where your CSV files are located !!
FOOD_SECURITY_CSV = 'preprocessed_datasets/suite_of_food_security_indicators.csv'
PRODUCTION_CSV = 'preprocessed_datasets/production_crops_livestock.csv'
CENSUS_CSV = 'preprocessed_datasets/agricultural_census.csv'

## 1.3 Helper Functions

In [3]:
def safe_to_int(value, default=None):
    """Safely converts a value to an integer."""
    if value is None or value == '':
        return default
    try:
        return int(float(value)) # float first to handle "123.0"
    except (ValueError, TypeError):
        return default

def safe_to_float(value, default=None):
    """Safely converts a value to a float."""
    if value is None or value == '':
        return default
    try:
        return float(str(value).replace(',', '.'))
    except (ValueError, TypeError):
        return default


def create_constraints(tx):
    """
    Creates unique constraints in Neo4j to ensure data integrity and
    optimize MERGE operations. These are idempotent.

    Args:
        tx: A Neo4j transaction object.
    """
    print("Creating constraints...")
    # For Country nodes, identified by iso_code
    tx.run("""
        CREATE CONSTRAINT country_iso_code_unique IF NOT EXISTS
        FOR (c:Country) REQUIRE c.iso_code IS UNIQUE
    """)
    # For Food_Security_Indicator nodes, identified by name and unit
    tx.run("""
        CREATE CONSTRAINT fsi_name_unit_unique IF NOT EXISTS
        FOR (fsi:Food_Security_Indicator) REQUIRE (fsi.name, fsi.unit) IS UNIQUE
    """)
    # For Agricultural_Product nodes, identified by name and type
    tx.run("""
        CREATE CONSTRAINT ap_name_type_unique IF NOT EXISTS
        FOR (ap:Agricultural_Product) REQUIRE (ap.name, ap.type) IS UNIQUE
    """)
    # For Farm_Size_Range nodes, identified by lower and upper limits
    tx.run("""
        CREATE CONSTRAINT fsr_limits_unique IF NOT EXISTS
        FOR (fsr:Farm_Size_Range) REQUIRE (fsr.lower_limit, fsr.upper_limit) IS UNIQUE
    """)
    # For Agricultural_Census nodes, identified by a synthetic unique_key (country_iso + year)
    tx.run("""
        CREATE CONSTRAINT ac_unique_key_unique IF NOT EXISTS
        FOR (ac:Agricultural_Census) REQUIRE ac.unique_key IS UNIQUE
    """)
    print("Constraints created (or already existed).")

# 2. Importing Data to Neo4j

## 2.1 suite_of_food_security_indicators.csv

In [4]:
def load_food_security_data_neo4j(session, file_path):
    """
    Loads data from the food security CSV into Neo4j.
    Creates Country and Food_Security_Indicator nodes, and HAS_MEASUREMENT relationships.

    Args:
        session: A Neo4j driver session.
        file_path (str): Path to suite_of_food_security_indicators.csv.
    """
    print(f"\nLoading Food Security data from {file_path}...")
    global next_fsi_id
    processed_rows = 0
    skipped_rows = 0

    # Using a dictionary to manage FSI IDs to ensure unique ID per unique FSI
    fsi_id_map = {}

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        batch = []
        batch_size = 500 # Process in batches

        for row in reader:
            try:
                country_iso_code = row['country_iso_code'].strip()
                country_name = row['country_name'].strip()
                fs_measurement_year = safe_to_int(row['fs_measurement_year'])
                fs_measurement_value = safe_to_float(row['fs_measurement_value'])
                fs_indicator_name = row['fs_indicator_name'].strip()
                fs_indicator_unit = row['fs_indicator_unit'].strip()

                if not all([country_iso_code, country_name, fs_indicator_name, fs_indicator_unit,
                            fs_measurement_year is not None, fs_measurement_value is not None]):
                    skipped_rows += 1
                    continue

                # For Food_Security_Indicator ID generation
                fsi_key = (fs_indicator_name, fs_indicator_unit)
                if fsi_key not in fsi_id_map:
                    fsi_id_map[fsi_key] = next_fsi_id
                    current_fsi_id = next_fsi_id
                    next_fsi_id += 1
                else:
                    current_fsi_id = fsi_id_map[fsi_key]

                params = {
                    "country_iso": country_iso_code,
                    "country_name": country_name,
                    "country_id": safe_to_int(country_iso_code),
                    "fsi_name": fs_indicator_name,
                    "fsi_unit": fs_indicator_unit,
                    "fsi_id": current_fsi_id,
                    "meas_year": fs_measurement_year,
                    "meas_value": fs_measurement_value
                }
                batch.append(params)

                if len(batch) >= batch_size:
                    session.execute_write(_create_food_security_batch, batch)
                    processed_rows += len(batch)
                    print(f"  Processed {processed_rows} food security rows...", end='\r')
                    sys.stdout.flush()
                    batch = []

            except (KeyError, ValueError, TypeError) as e:
                print(f"Skipping food security row due to data error: {row} - Error: {e}")
                skipped_rows += 1

        if batch: # Process any remaining rows
            session.execute_write(_create_food_security_batch, batch)
            processed_rows += len(batch)

    print(f"\nFinished loading {file_path}. Rows processed: {processed_rows}, Rows skipped: {skipped_rows}.")


def _create_food_security_batch(tx, batch_data):
    """Helper to write a batch of food security data to Neo4j."""
    query = """
    UNWIND $batch as row
    // MERGE Country node
    MERGE (c:Country {iso_code: row.country_iso})
    ON CREATE SET c.name = row.country_name, c.ID = row.country_id
    ON MATCH SET c.name = row.country_name // Update name if it changed, ID should be constant

    // MERGE Food_Security_Indicator node
    MERGE (fsi:Food_Security_Indicator {name: row.fsi_name, unit: row.fsi_unit})
    ON CREATE SET fsi.ID = row.fsi_id

    // MERGE Relationship (Country)-[:HAS_MEASUREMENT]->(Food_Security_Indicator)
    // This assumes a unique measurement per country, indicator, and year.
    // If multiple measurements are possible, this MERGE might need adjustment or use CREATE.
    MERGE (c)-[r:HAS_MEASUREMENT {year: row.meas_year}]->(fsi)
    ON CREATE SET r.value = row.meas_value
    ON MATCH SET r.value = row.meas_value // Update if value changes for same key
    """
    tx.run(query, batch=batch_data)

## 2.2 production_crops_livestock.csv

In [7]:
def load_production_data_neo4j(session, file_path):
    """
    Loads data from the production CSV into Neo4j.
    Creates Country and Agricultural_Product nodes, and PRODUCES relationships.

    Args:
        session: A Neo4j driver session.
        file_path (str): Path to production_crops_livestock.csv.
    """
    print(f"\nLoading Production data from {file_path}...")
    global next_product_id
    processed_rows = 0
    skipped_rows = 0

    product_id_map = {}

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        batch = []
        batch_size = 500

        for row in reader:
            try:
                country_iso_code = row['country_iso_code'].strip()
                country_name = row['country_name'].strip()
                prod_year = safe_to_int(row['a_production_year'])
                prod_tons = safe_to_float(row['a_production_tons'])
                prod_n_animals = safe_to_int(row['a_production_n_animals'])
                prod_harvested_area = safe_to_float(row['a_production_harvested_area'])
                product_name = row['a_product_name'].strip()
                product_type = row['a_product_type'].strip()

                if not all([country_iso_code, country_name, product_name, product_type,
                            prod_year is not None, prod_tons is not None]): # Animals/Area can be null
                    skipped_rows += 1
                    continue

                product_key = (product_name, product_type)
                if product_key not in product_id_map:
                    product_id_map[product_key] = next_product_id
                    current_product_id = next_product_id
                    next_product_id += 1
                else:
                    current_product_id = product_id_map[product_key]

                params = {
                    "country_iso": country_iso_code,
                    "country_name": country_name,
                    "country_id": safe_to_int(country_iso_code),
                    "prod_name": product_name,
                    "prod_type": product_type,
                    "prod_id": current_product_id,
                    "prod_year": prod_year,
                    "prod_tons": prod_tons,
                    "prod_animals": prod_n_animals,
                    "prod_area": prod_harvested_area
                }
                batch.append(params)

                if len(batch) >= batch_size:
                    session.execute_write(_create_production_batch, batch)
                    processed_rows += len(batch)
                    print(f"  Processed {processed_rows} production rows...", end='\r')
                    sys.stdout.flush()
                    batch = []

            except (KeyError, ValueError, TypeError) as e:
                print(f"Skipping production row due to data error: {row} - Error: {e}")
                skipped_rows += 1

        if batch:
            session.execute_write(_create_production_batch, batch)
            processed_rows += len(batch)

    print(f"\nFinished loading {file_path}. Rows processed: {processed_rows}, Rows skipped: {skipped_rows}.")


def _create_production_batch(tx, batch_data):
    """Helper to write a batch of production data to Neo4j."""
    query = """
    UNWIND $batch as row
    // MERGE Country node
    MERGE (c:Country {iso_code: row.country_iso})
    ON CREATE SET c.name = row.country_name, c.ID = row.country_id
    ON MATCH SET c.name = row.country_name

    // MERGE Agricultural_Product node
    MERGE (ap:Agricultural_Product {name: row.prod_name, type: row.prod_type})
    ON CREATE SET ap.ID = row.prod_id

    // MERGE Relationship (Country)-[:PRODUCES]->(Agricultural_Product)
    // Assumes unique production event per country, product, year.
    MERGE (c)-[r:PRODUCES {year: row.prod_year}]->(ap)
    ON CREATE SET r.production_tons = row.prod_tons,
                  r.number_of_animals = row.prod_animals,
                  r.harvested_area_ha = row.prod_area
    ON MATCH SET r.production_tons = row.prod_tons, // Update properties if they change
                 r.number_of_animals = row.prod_animals,
                 r.harvested_area_ha = row.prod_area
    """

    tx.run(query, batch=batch_data)

## 2.3 agricultural_census.csv

In [8]:
def load_census_data_neo4j(session, file_path):
    """
    Loads data from the agricultural census CSV into Neo4j.
    Creates Country, Agricultural_Census, Farm_Size_Range nodes and
    CONDUCTED_IN, HAS_FARM_SIZE_RANGE relationships.

    Args:
        session: A Neo4j driver session.
        file_path (str): Path to agricultural_census.csv.
    """
    print(f"\nLoading Agricultural Census data from {file_path}...")
    global next_census_id, next_fsr_id
    processed_rows = 0
    skipped_rows = 0

    census_id_map = {}
    fsr_id_map = {}

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        batch = []
        batch_size = 500

        for row in reader:
            try:
                country_iso_code = row['country_iso_code'].strip()
                country_name = row['country_name'].strip()
                census_decade = row['agricultural_census_decade'].strip()
                census_year = safe_to_int(row['agricultural_census_year'])
                total_area_ha = safe_to_float(row['total_area_ha'])
                num_properties = safe_to_int(row['number_of_properties'])
                farm_lower = safe_to_int(row['farm_size_lower_limit'])
                farm_upper = safe_to_int(row['farm_size_upper_limit'])

                if not all([country_iso_code, country_name, census_decade,
                            census_year is not None, total_area_ha is not None,
                            num_properties is not None, farm_lower is not None,
                            farm_upper is not None]):
                    skipped_rows += 1
                    continue

                # ID for Agricultural_Census (unique per country & year)
                census_unique_key_str = f"{country_iso_code}_{census_year}"
                if census_unique_key_str not in census_id_map:
                    census_id_map[census_unique_key_str] = next_census_id
                    current_census_id = next_census_id
                    next_census_id += 1
                else:
                    current_census_id = census_id_map[census_unique_key_str]

                # ID for Farm_Size_Range
                fsr_key = (farm_lower, farm_upper)
                if fsr_key not in fsr_id_map:
                    fsr_id_map[fsr_key] = next_fsr_id
                    current_fsr_id = next_fsr_id
                    next_fsr_id += 1
                else:
                    current_fsr_id = fsr_id_map[fsr_key]

                params = {
                    "country_iso": country_iso_code,
                    "country_name": country_name,
                    "country_id": safe_to_int(country_iso_code),
                    "census_year": census_year,
                    "census_decade": census_decade,
                    "census_unique_key": census_unique_key_str, # For MERGE
                    "census_id": current_census_id,             # Property value
                    "fsr_lower": farm_lower,
                    "fsr_upper": farm_upper,
                    "fsr_id": current_fsr_id,
                    "area_ha": total_area_ha,
                    "num_props": num_properties
                }
                batch.append(params)

                if len(batch) >= batch_size:
                    session.execute_write(_create_census_batch, batch)
                    processed_rows += len(batch)
                    print(f"  Processed {processed_rows} census rows...", end='\r')
                    sys.stdout.flush()
                    batch = []

            except (KeyError, ValueError, TypeError) as e:
                print(f"Skipping census row due to data error: {row} - Error: {e}")
                skipped_rows += 1
        
        if batch:
            session.execute_write(_create_census_batch, batch)
            processed_rows += len(batch)

    print(f"\nFinished loading {file_path}. Rows processed: {processed_rows}, Rows skipped: {skipped_rows}.")

def _create_census_batch(tx, batch_data):
    """Helper to write a batch of census data to Neo4j."""
    query = """
    UNWIND $batch as row
    // MERGE Country node
    MERGE (c:Country {iso_code: row.country_iso})
    ON CREATE SET c.name = row.country_name, c.ID = row.country_id
    ON MATCH SET c.name = row.country_name

    // MERGE Agricultural_Census node (unique per country-year)
    MERGE (ac:Agricultural_Census {unique_key: row.census_unique_key})
    ON CREATE SET ac.census_year = row.census_year,
                  ac.decade = row.census_decade,
                  ac.ID = row.census_id

    // MERGE Relationship (Agricultural_Census)-[:CONDUCTED_IN]->(Country)
    MERGE (ac)-[:CONDUCTED_IN]->(c)

    // MERGE Farm_Size_Range node
    MERGE (fsr:Farm_Size_Range {lower_limit: row.fsr_lower, upper_limit: row.fsr_upper})
    ON CREATE SET fsr.ID = row.fsr_id

    // MERGE Relationship (Agricultural_Census)-[:HAS_FARM_SIZE_RANGE]->(Farm_Size_Range)
    // This relationship is specific to this census and this farm size range.
    // A single census can have multiple farm size ranges.
    MERGE (ac)-[r:HAS_FARM_SIZE_RANGE {lower_limit:fsr.lower_limit, upper_limit:fsr.upper_limit}]->(fsr) // ensure relationship is unique for this census-fsr pair
    ON CREATE SET r.total_area_ha = row.area_ha, r.number_of_properties = row.num_props
    ON MATCH SET r.total_area_ha = row.area_ha, r.number_of_properties = row.num_props
    """
    # Note on MERGE for HAS_FARM_SIZE_RANGE:
    # The properties {lower_limit:fsr.lower_limit, upper_limit:fsr.upper_limit} in the MERGE
    # for the relationship ensure that if a row for the same census and same farm size range
    # appears twice (e.g. data error or re-run), it updates the existing relationship rather
    # than creating a new one.
    tx.run(query, batch=batch_data)


## 2.4 Importing data from the CSV files

In [9]:
driver = None
try:
    # Establish Neo4j database connection
    print("Connecting to Neo4j database...")
    driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
    driver.verify_connectivity() # Checks if connection is valid
    print("Neo4j connection successful.")

    # Create constraints (run once or ensure they are idempotent)
    with driver.session(database=DB_NAME) as session:
        session.execute_write(create_constraints)

    # Load data from CSV files
    with driver.session(database=DB_NAME) as session:
        load_food_security_data_neo4j(session, FOOD_SECURITY_CSV)
        load_production_data_neo4j(session, PRODUCTION_CSV)
        load_census_data_neo4j(session, CENSUS_CSV)

    print("\nAll data loading processes completed.")

except Exception as e:
    print(f"\nAn error occurred: {e}")
    import traceback
    traceback.print_exc()
finally:
    if driver:
        driver.close()
        print("Neo4j connection closed.")

Connecting to Neo4j database...
Neo4j connection successful.
Creating constraints...
Constraints created (or already existed).

Loading Food Security data from preprocessed_datasets/suite_of_food_security_indicators.csv...
  Processed 139500 food security rows...
Finished loading preprocessed_datasets/suite_of_food_security_indicators.csv. Rows processed: 139538, Rows skipped: 0.

Loading Production data from preprocessed_datasets/production_crops_livestock.csv...
  Processed 1523500 production rows...
Finished loading preprocessed_datasets/production_crops_livestock.csv. Rows processed: 1523718, Rows skipped: 3220.

Loading Agricultural Census data from preprocessed_datasets/agricultural_census.csv...
  Processed 2000 census rows...
Finished loading preprocessed_datasets/agricultural_census.csv. Rows processed: 2278, Rows skipped: 0.

All data loading processes completed.
Neo4j connection closed.


# 3. Defining Queries

## 3.1 Query 1: High Land Concentration & Persistent Food Insecurity
This query aims to find countries where the latest agricultural census indicates significant land concentration (majority of land held by large farms) AND where recent food security data shows persistently high levels of undernourishment or severe food insecurity.

Neo4j is well-suited because it can efficiently traverse from Country to its latest Agricultural_Census, then to Farm_Size_Range details, while simultaneously exploring paths to multiple Food_Security_Indicator measurements over time, allowing complex conditional pattern matching across these different domains.


In [22]:
query1_cypher = """ 
// Define thresholds
WITH 0.20 AS largeFarmAreaConcentrationThreshold, // e.g., 20% of land in large farms (>=100ha)
     5.0 AS highUndernourishmentThreshold,      // e.g., 5% prevalence
     1.0 AS highSevereInsecurityThreshold,      // e.g., 1% prevalence
     'Prevalence of undernourishment (percent) (3-year average)' AS undernourishmentIndicatorName,
     'Prevalence of severe food insecurity in the total population (percent) (3-year average)' AS severeInsecurityIndicatorName

MATCH (c:Country)<-[:CONDUCTED_IN]-(ac:Agricultural_Census)
// Get the latest census for each country
WITH c, max(ac.census_year) AS latest_census_year,
     largeFarmAreaConcentrationThreshold, highUndernourishmentThreshold, highSevereInsecurityThreshold,
     undernourishmentIndicatorName, severeInsecurityIndicatorName

MATCH (c)<-[:CONDUCTED_IN]-(latest_ac:Agricultural_Census {census_year: latest_census_year})
MATCH (latest_ac)-[r_fsr:HAS_FARM_SIZE_RANGE]->(fsr:Farm_Size_Range)

// Calculate land concentration for the latest census
WITH c, latest_census_year,
     sum(CASE WHEN fsr.lower_limit >= 100 THEN r_fsr.total_area_ha ELSE 0 END) AS area_large_farms,
     sum(r_fsr.total_area_ha) AS total_census_area,
     largeFarmAreaConcentrationThreshold, highUndernourishmentThreshold, highSevereInsecurityThreshold,
     undernourishmentIndicatorName, severeInsecurityIndicatorName
WHERE total_census_area > 0 // Avoid division by zero
WITH c, latest_census_year,
     (1.0 * area_large_farms / total_census_area) AS concentration_ratio,
     largeFarmAreaConcentrationThreshold, highUndernourishmentThreshold, highSevereInsecurityThreshold,
     undernourishmentIndicatorName, severeInsecurityIndicatorName
WHERE concentration_ratio >= largeFarmAreaConcentrationThreshold // Filter by land concentration first

// For these countries, get the two most recent Undernourishment measurements
CALL (c, undernourishmentIndicatorName) {
    OPTIONAL MATCH (c)-[r_und:HAS_MEASUREMENT]->(fsi_und:Food_Security_Indicator {name: undernourishmentIndicatorName})
    WHERE r_und.value IS NOT NULL
    WITH r_und.value AS val, r_und.year AS yr ORDER BY yr DESC
    RETURN collect(val)[0..2] AS undernourishment_data_raw // Collect up to 2 most recent values
}

// For these countries, get the two most recent Severe Food Insecurity measurements
CALL (c, severeInsecurityIndicatorName) {
    OPTIONAL MATCH (c)-[r_sev:HAS_MEASUREMENT]->(fsi_sev:Food_Security_Indicator {name: severeInsecurityIndicatorName})
    WHERE r_sev.value IS NOT NULL
    WITH r_sev.value AS val, r_sev.year AS yr ORDER BY yr DESC
    RETURN collect(val)[0..2] AS severe_insecurity_data_raw // Collect up to 2 most recent values
}

// Collect the measurements and prepare for condition checking
WITH c, latest_census_year, concentration_ratio,
     undernourishment_data_raw, severe_insecurity_data_raw,
     highUndernourishmentThreshold, highSevereInsecurityThreshold

// Define default values for easier logic if lists are short or metrics are missing
WITH c, latest_census_year AS census_year, concentration_ratio, // census_year aliased here
     undernourishment_data_raw,
     severe_insecurity_data_raw,
     // Handle cases where fewer than 2 measurements exist by accessing list elements carefully
     // Use a value like -1.0 (or any value guaranteed to be below threshold) if data is missing,
     // to simplify boolean logic later. Null could also be used with COALESCE.
     CASE WHEN size(undernourishment_data_raw) > 0 THEN undernourishment_data_raw[0] ELSE -1.0 END AS latest_und,
     CASE WHEN size(undernourishment_data_raw) > 1 THEN undernourishment_data_raw[1] ELSE -1.0 END AS prev_und,
     CASE WHEN size(severe_insecurity_data_raw) > 0 THEN severe_insecurity_data_raw[0] ELSE -1.0 END AS latest_sev,
     CASE WHEN size(severe_insecurity_data_raw) > 1 THEN severe_insecurity_data_raw[1] ELSE -1.0 END AS prev_sev,
     highUndernourishmentThreshold, highSevereInsecurityThreshold

// Determine if conditions are met (persistently high or worsening)
WITH c, census_year, concentration_ratio,
     undernourishment_data_raw, severe_insecurity_data_raw, // Keep original lists for RETURN
     latest_und, prev_und, latest_sev, prev_sev,
     // Undernourishment concern
     (
       (latest_und >= highUndernourishmentThreshold AND prev_und >= highUndernourishmentThreshold) OR // Persistently high
       (latest_und > prev_und AND latest_und >= highUndernourishmentThreshold) OR                     // Worsening and high
       (latest_und >= highUndernourishmentThreshold AND prev_und = -1.0)                               // Single recent high point (no prev_und to compare)
     ) AS undernourishment_concern,
     // Severe insecurity concern
     (
       (latest_sev >= highSevereInsecurityThreshold AND prev_sev >= highSevereInsecurityThreshold) OR // Persistently high
       (latest_sev > prev_sev AND latest_sev >= highSevereInsecurityThreshold) OR                     // Worsening and high
       (latest_sev >= highSevereInsecurityThreshold AND prev_sev = -1.0)                               // Single recent high point
     ) AS severe_insecurity_concern

WHERE undernourishment_concern OR severe_insecurity_concern // Filter for countries meeting at least one food security concern

RETURN
    c.name AS country,
    census_year,
    round(concentration_ratio * 100, 2) AS land_concentration_percent,
    // Use original lists for display, which might contain nulls if data was missing initially from DB
    // The CASE WHEN size > X THEN list[Y] ELSE null END is a robust way to access
    CASE WHEN size(undernourishment_data_raw) > 0 THEN undernourishment_data_raw[0] ELSE null END AS latest_undernourishment,
    CASE WHEN size(undernourishment_data_raw) > 1 THEN undernourishment_data_raw[1] ELSE null END AS previous_undernourishment,
    CASE WHEN size(severe_insecurity_data_raw) > 0 THEN severe_insecurity_data_raw[0] ELSE null END AS latest_severe_insecurity,
    CASE WHEN size(severe_insecurity_data_raw) > 1 THEN severe_insecurity_data_raw[1] ELSE null END AS previous_severe_insecurity,
    undernourishment_concern,
    severe_insecurity_concern
ORDER BY land_concentration_percent DESC, c.name
LIMIT 25;"""

## 3.2 Query 2: Resilient Food Systems (Staples)
This query identifies countries demonstrating a resilient food system for staple crops, characterized by high domestic production of key staples, low dependency on cereal imports, and high average dietary energy supply adequacy, all within the same recent year.
Neo4j excels at finding nodes (Country) that satisfy multiple distinct relational conditions simultaneously, allowing the definition and discovery of such complex profiles by matching paths to different related entities (Agricultural_Product, Food_Security_Indicator) with specific properties on those relationships.


In [28]:
query2_cypher = """ 
// Define thresholds and target staple crops
WITH ['Rice', 'Wheat', 'Maize (corn)'] AS stapleCrops,
     10000000 AS minStapleProductionTons, // Minimum total tons for "high production"
     25.0 AS maxImportDependencyRatio,    // Max % for "low dependency"
     100.0 AS minEnergyAdequacyPercent   // Min % for "good adequacy"

MATCH (c:Country)
// Find the latest year per country where all three data types are available

CALL (c, stapleCrops) {
    MATCH (c)-[r_prod:PRODUCES]->(ap:Agricultural_Product)
    WHERE r_prod.year IS NOT NULL AND ap.name IN stapleCrops
    WITH c, stapleCrops, r_prod.year AS common_year, collect(DISTINCT ap.name) AS products_in_year
    WHERE size(products_in_year) >= 1 // At least one staple product data for that year

    MATCH (c)-[r_imp:HAS_MEASUREMENT {year: common_year}]->(fsi_imp:Food_Security_Indicator {name: 'Cereal import dependency ratio (percent) (3-year average)'})
    MATCH (c)-[r_adeq:HAS_MEASUREMENT {year: common_year}]->(fsi_adeq:Food_Security_Indicator {name: 'Average dietary energy supply adequacy (percent) (3-year average)'})
    RETURN common_year AS qualifying_year
    ORDER BY common_year DESC
    LIMIT 1
}
// If a qualifying_year is found, get the data for that year
WITH c, qualifying_year, stapleCrops, minStapleProductionTons, maxImportDependencyRatio, minEnergyAdequacyPercent
WHERE qualifying_year IS NOT NULL

MATCH (c)-[r_prod_final:PRODUCES {year: qualifying_year}]->(ap_final:Agricultural_Product)
WHERE ap_final.name IN stapleCrops
WITH c, qualifying_year, sum(r_prod_final.production_tons) AS total_staple_prod,
     minStapleProductionTons, maxImportDependencyRatio, minEnergyAdequacyPercent, stapleCrops

MATCH (c)-[r_imp_final:HAS_MEASUREMENT {year: qualifying_year}]->(fsi_imp_final:Food_Security_Indicator {name: 'Cereal import dependency ratio (percent) (3-year average)'})
MATCH (c)-[r_adeq_final:HAS_MEASUREMENT {year: qualifying_year}]->(fsi_adeq_final:Food_Security_Indicator {name: 'Average dietary energy supply adequacy (percent) (3-year average)'})

WITH c.name AS country,
     qualifying_year AS data_year,
     round(total_staple_prod) AS total_staple_production_tons,
     r_imp_final.value AS cereal_import_dependency_ratio,
     r_adeq_final.value AS energy_adequacy_percent,
     minStapleProductionTons, maxImportDependencyRatio, minEnergyAdequacyPercent 
WHERE total_staple_production_tons >= minStapleProductionTons
  AND cereal_import_dependency_ratio <= maxImportDependencyRatio
  AND energy_adequacy_percent >= minEnergyAdequacyPercent
RETURN country, data_year, total_staple_production_tons, cereal_import_dependency_ratio, energy_adequacy_percent
ORDER BY energy_adequacy_percent DESC, total_staple_production_tons DESC
LIMIT 25;"""

## 3.3 Query 3: Animal Product Self-Sufficiency Profile
This query seeks countries that show strong domestic capacity in animal agriculture by having high production of key animal products ('Meat, Total', 'Milk, Total', 'Eggs Primary') and simultaneously achieving a high average supply of animal protein per capita in the same recent year.
Neo4j's graph model allows for efficient querying of countries based on combined criteria from their production activities (PRODUCES relationships to Agricultural_Product nodes) and their nutritional outcomes (HAS_MEASUREMENT relationships to Food_Security_Indicator nodes), identifying patterns of successful domestic provision.


In [29]:
query3_cypher = """ 
// Define thresholds and target animal products
WITH ['Meat, Total', 'Milk, Total', 'Eggs Primary'] AS animalProducts,
     1000000 AS minAnimalProductionTons, // Minimum total tons for "high production"
     50.0 AS minAnimalProteinSupply      // Min g/cap/day for "high supply"

MATCH (c:Country)
// Find the latest year per country where both production and supply data are available
WITH c, animalProducts, minAnimalProductionTons, minAnimalProteinSupply
CALL (c, animalProducts) {
    MATCH (c)-[r_prod:PRODUCES]->(ap:Agricultural_Product)
    WHERE r_prod.year IS NOT NULL AND ap.name IN animalProducts
    WITH c, animalProducts, r_prod.year AS common_year, collect(DISTINCT ap.name) AS products_in_year
    WHERE size(products_in_year) >= 1 // At least one animal product data for that year

    MATCH (c)-[r_prot:HAS_MEASUREMENT {year: common_year}]->(fsi_prot:Food_Security_Indicator {name: 'Average supply of protein of animal origin (g/cap/day) (3-year average)'})
    RETURN common_year AS qualifying_year
    ORDER BY common_year DESC
    LIMIT 1
}
// If a qualifying_year is found, get the data for that year
WITH c, qualifying_year, animalProducts, minAnimalProductionTons, minAnimalProteinSupply
WHERE qualifying_year IS NOT NULL

MATCH (c)-[r_prod_final:PRODUCES {year: qualifying_year}]->(ap_final:Agricultural_Product)
WHERE ap_final.name IN animalProducts
WITH c, qualifying_year, sum(r_prod_final.production_tons) AS total_animal_prod,
     minAnimalProductionTons, minAnimalProteinSupply

MATCH (c)-[r_prot_final:HAS_MEASUREMENT {year: qualifying_year}]->(fsi_prot_final:Food_Security_Indicator {name: 'Average supply of protein of animal origin (g/cap/day) (3-year average)'})

WITH c.name AS country,
     qualifying_year AS data_year,
     round(total_animal_prod) AS total_animal_production_tons,
     r_prot_final.value AS animal_protein_supply_g_cap_day
WHERE total_animal_production_tons >= minAnimalProductionTons
  AND animal_protein_supply_g_cap_day >= minAnimalProteinSupply
RETURN country, data_year, total_animal_production_tons, animal_protein_supply_g_cap_day
ORDER BY animal_protein_supply_g_cap_day DESC, total_animal_production_tons DESC
LIMIT 25;"""

## 3.4 Query 4: Crop Specialization Shifts & Protein Supply Dynamics
This query investigates countries that have significantly increased production of a major export-oriented crop (e.g., 'Soya beans') between an earlier and a more recent period, and examines the concurrent change in their domestic average protein supply.
Neo4j facilitates such temporal pattern analysis by allowing queries to MATCH and aggregate data across different time-bound relationships (production and measurements in specific year ranges) connected to the same Country node, revealing correlated trends over time.


In [6]:
query4_cypher = """ 
// Define periods, target crop, and change threshold
WITH 'Soya beans' AS targetCrop,
     2000 AS earlyPeriodStart, 2004 AS earlyPeriodEnd,
     2015 AS recentPeriodStart, 2019 AS recentPeriodEnd,
     1.5 AS minProductionIncreaseFactor, // e.g., recent production is 1.5x early production
     'Average protein supply (g/cap/day) (3-year average)' AS proteinIndicatorName

MATCH (c:Country)

// Average production of targetCrop in early period
CALL (c, targetCrop, earlyPeriodStart, earlyPeriodEnd) {
    MATCH (c)-[r:PRODUCES]->(ap:Agricultural_Product {name: targetCrop})
    WHERE r.year >= earlyPeriodStart AND r.year <= earlyPeriodEnd AND r.production_tons IS NOT NULL
    RETURN avg(r.production_tons) AS avg_early_crop_prod
}
// Average production of targetCrop in recent period
CALL (c, targetCrop, recentPeriodStart, recentPeriodEnd) {
    MATCH (c)-[r:PRODUCES]->(ap:Agricultural_Product {name: targetCrop})
    WHERE r.year >= recentPeriodStart AND r.year <= recentPeriodEnd AND r.production_tons IS NOT NULL
    RETURN avg(r.production_tons) AS avg_recent_crop_prod
}

// Average protein supply in early period
CALL (c, proteinIndicatorName, earlyPeriodStart, earlyPeriodEnd) {
    MATCH (c)-[r:HAS_MEASUREMENT]->(fsi:Food_Security_Indicator {name: proteinIndicatorName})
    WHERE r.year >= earlyPeriodStart AND r.year <= earlyPeriodEnd AND r.value IS NOT NULL
    RETURN avg(r.value) AS avg_early_protein
}
// Average protein supply in recent period
CALL (c, proteinIndicatorName, recentPeriodStart, recentPeriodEnd) {
    MATCH (c)-[r:HAS_MEASUREMENT]->(fsi:Food_Security_Indicator {name: proteinIndicatorName})
    WHERE r.year >= recentPeriodStart AND r.year <= recentPeriodEnd AND r.value IS NOT NULL
    RETURN avg(r.value) AS avg_recent_protein
}

// THIS IS THE KEY CHANGE: Add a WITH clause here
WITH c, targetCrop, minProductionIncreaseFactor, // Carry forward necessary variables
     avg_early_crop_prod, avg_recent_crop_prod,
     avg_early_protein, avg_recent_protein
WHERE avg_early_crop_prod IS NOT NULL AND avg_recent_crop_prod IS NOT NULL AND avg_early_crop_prod > 0 // Ensure data exists and avoid division by zero
  AND (avg_recent_crop_prod / avg_early_crop_prod) >= minProductionIncreaseFactor

RETURN
    c.name AS country,
    targetCrop,
    round(avg_early_crop_prod) AS avg_crop_prod_early_period,
    round(avg_recent_crop_prod) AS avg_crop_prod_recent_period,
    round((avg_recent_crop_prod / avg_early_crop_prod), 2) AS crop_prod_increase_factor,
    round(avg_early_protein, 2) AS avg_protein_supply_early_period,
    round(avg_recent_protein, 2) AS avg_protein_supply_recent_period,
    CASE
        WHEN avg_early_protein IS NOT NULL AND avg_recent_protein IS NOT NULL THEN round(avg_recent_protein - avg_early_protein, 2)
        ELSE null
    END AS protein_supply_change
ORDER BY crop_prod_increase_factor DESC, c.name
LIMIT 25;"""

## 3.5 Query 5: Comparative Land Profiles of Countries with Extreme Food Insecurity
This query identifies groups of countries with the best (lowest) and worst (highest) recent 'Prevalence of severe food insecurity', and then for each group, it retrieves their latest agricultural census data to present a summarized land distribution profile (e.g., percentage of area in small, medium, large farms).
Neo4j is effective here for first identifying groups of Country nodes based on an outcome indicator (severe food insecurity), and then efficiently traversing to related Agricultural_Census and Farm_Size_Range data to aggregate and compare structural characteristics (land distribution) across these distinct groups.


In [26]:
query5_cypher = """ 
// Define number of countries for extreme groups and insecurity indicator
WITH 10 AS N_countries_per_group, // Number of countries in "best" and "worst" groups
     'Prevalence of severe food insecurity in the total population (percent) (3-year average)' AS insecurityIndicatorName

// Get latest severe food insecurity value for all countries
MATCH (c:Country)-[r_fsm:HAS_MEASUREMENT]->(fsi:Food_Security_Indicator {name: insecurityIndicatorName})
WITH c, fsi, N_countries_per_group, insecurityIndicatorName, max(r_fsm.year) AS latest_fsm_year
MATCH (c)-[r_fsm_latest:HAS_MEASUREMENT {year: latest_fsm_year}]->(fsi_latest:Food_Security_Indicator {name: insecurityIndicatorName})
WITH c, r_fsm_latest.value AS latest_insecurity_value, N_countries_per_group
WHERE latest_insecurity_value IS NOT NULL
ORDER BY latest_insecurity_value ASC // For collecting both best and worst

// Collect all countries with their insecurity values
WITH collect({country: c, insecurity: latest_insecurity_value}) AS all_countries_sorted_insecurity, N_countries_per_group

// Extract best N and worst N countries
// ***** DEFINE GROUP NAME LITERALS HERE *****
WITH
    all_countries_sorted_insecurity[0..N_countries_per_group] AS best_outcome_countries_data,
    all_countries_sorted_insecurity[(size(all_countries_sorted_insecurity)-N_countries_per_group)..size(all_countries_sorted_insecurity)] AS worst_outcome_countries_data,
    "Best Outcome (Lowest Insecurity)" AS best_group_name,  // Define literal for best group
    "Worst Outcome (Highest Insecurity)" AS worst_group_name // Define literal for worst group
    // N_countries_per_group is implicitly carried if not re-listed, but it's not used inside the CALL block directly

// Unwind best and worst, tag them, and process
// Variables available to the CALL block: best_outcome_countries_data, worst_outcome_countries_data, best_group_name, worst_group_name
CALL (best_outcome_countries_data, worst_outcome_countries_data, best_group_name, worst_group_name) {
    // Branch for "best_outcome_countries"
    // ***** CORRECTED IMPORTING WITH: Simple references only *****
    UNWIND best_outcome_countries_data AS country_data // No need for country_data_list alias, use original name
    // ***** ALIASING and property access in a subsequent WITH *****
    WITH country_data.country AS c, country_data.insecurity AS insecurity_value, best_group_name AS outcome_group // Alias group name here
    // Get latest census for these countries
    MATCH (c)<-[:CONDUCTED_IN]-(ac:Agricultural_Census)
    WITH c, insecurity_value, outcome_group, max(ac.census_year) AS latest_census_year
    MATCH (c)<-[:CONDUCTED_IN]-(latest_ac:Agricultural_Census {census_year: latest_census_year})
    MATCH (latest_ac)-[r_fsr:HAS_FARM_SIZE_RANGE]->(fsr:Farm_Size_Range)

    // Aggregate land distribution into defined brackets
    WITH c.name AS country_name, outcome_group, insecurity_value, latest_census_year,
         sum(r_fsr.total_area_ha) AS total_census_area,
         sum(CASE WHEN fsr.upper_limit < 10 THEN r_fsr.total_area_ha ELSE 0 END) AS area_small_farms,
         sum(CASE WHEN fsr.lower_limit >= 10 AND fsr.upper_limit < 100 THEN r_fsr.total_area_ha ELSE 0 END) AS area_medium_farms,
         sum(CASE WHEN fsr.lower_limit >= 100 THEN r_fsr.total_area_ha ELSE 0 END) AS area_large_farms
    WHERE total_census_area > 0
    RETURN country_name,
           outcome_group,
           round(insecurity_value,2) AS insecurity_prevalence,
           latest_census_year,
           round(100.0 * area_small_farms / total_census_area, 1) AS pct_area_small_farms,
           round(100.0 * area_medium_farms / total_census_area, 1) AS pct_area_medium_farms,
           round(100.0 * area_large_farms / total_census_area, 1) AS pct_area_large_farms

    UNION ALL // Combine with worst outcome countries

    // Branch for "worst_outcome_countries"
    // ***** CORRECTED IMPORTING WITH: Simple references only *****
    WITH worst_outcome_countries_data, worst_group_name
    UNWIND worst_outcome_countries_data AS country_data // Use original name
    // ***** ALIASING and property access in a subsequent WITH *****
    WITH country_data.country AS c, country_data.insecurity AS insecurity_value, worst_group_name AS outcome_group // Alias group name here
    MATCH (c)<-[:CONDUCTED_IN]-(ac:Agricultural_Census)
    WITH c, insecurity_value, outcome_group, max(ac.census_year) AS latest_census_year
    MATCH (c)<-[:CONDUCTED_IN]-(latest_ac:Agricultural_Census {census_year: latest_census_year})
    MATCH (latest_ac)-[r_fsr:HAS_FARM_SIZE_RANGE]->(fsr:Farm_Size_Range)
    WITH c.name AS country_name, outcome_group, insecurity_value, latest_census_year,
         sum(r_fsr.total_area_ha) AS total_census_area,
         sum(CASE WHEN fsr.upper_limit < 10 THEN r_fsr.total_area_ha ELSE 0 END) AS area_small_farms,
         sum(CASE WHEN fsr.lower_limit >= 10 AND fsr.upper_limit < 100 THEN r_fsr.total_area_ha ELSE 0 END) AS area_medium_farms,
         sum(CASE WHEN fsr.lower_limit >= 100 THEN r_fsr.total_area_ha ELSE 0 END) AS area_large_farms
    WHERE total_census_area > 0
    RETURN country_name,
           outcome_group,
           round(insecurity_value,2) AS insecurity_prevalence,
           latest_census_year,
           round(100.0 * area_small_farms / total_census_area, 1) AS pct_area_small_farms,
           round(100.0 * area_medium_farms / total_census_area, 1) AS pct_area_medium_farms,
           round(100.0 * area_large_farms / total_census_area, 1) AS pct_area_large_farms
}
RETURN country_name, outcome_group, insecurity_prevalence, latest_census_year, pct_area_small_farms, pct_area_medium_farms, pct_area_large_farms
ORDER BY outcome_group, insecurity_prevalence; // For best, lower is better; for worst, higher is worse, but group name sorts first"""

# 4. Running Queries

## 4.1 Defining the Function to Run Queries

In [8]:
def execute_neo4j_query(driver: Driver, query_n, query_title, cypher_query):
    """
    Executes a given Cypher query and prints the results using pandas.
    """
    print(f"\n--- Query {query_n}: {query_title} ---")
    # print("Cypher:")
    # print(cypher_query)
    print("\nResults:")

    try:
        with driver.session(database=DB_NAME) as session:
            result = session.run(cypher_query)
            data = [record.data() for record in result]

            if data:
                df = pd.DataFrame(data)
                if result.keys():
                    df = df[result.keys()]
                print(df.to_string(index=False))
                df.to_csv(f"results/Neo4j_results/query_{query_n}_result.csv", index=False)
            else:
                print("Query executed successfully, but no rows returned.")

    except Exception as e:
        print(f"\nError executing query '{query_title}': {e}")
        import traceback
        traceback.print_exc()
    print("-" * (len(query_title) + 6))

## 4.2 Running the Queries

In [30]:
driver = None
try:
    print("Connecting to Neo4j database...")
    driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
    driver.verify_connectivity()
    print("Neo4j connection successful.")

    # Execute the queries
    execute_neo4j_query(driver, 1, "High Land Concentration & Persistent Food Insecurity", query1_cypher)
    execute_neo4j_query(driver, 2, "Resilient Food Systems (Staples)", query2_cypher)
    execute_neo4j_query(driver, 3, "Animal Product Self-Sufficiency Profile", query3_cypher)
    execute_neo4j_query(driver, 4, "Crop Specialization Shifts & Protein Supply Dynamics", query4_cypher)
    execute_neo4j_query(driver, 5, "Comparative Land Profiles of Countries with Extreme Food Insecurity", query5_cypher)

except Exception as e:
    print(f"\nAn main script error occurred: {e}")
    import traceback
    traceback.print_exc()
finally:
    if driver:
        driver.close()
        print("\nNeo4j connection closed.")


Connecting to Neo4j database...
Neo4j connection successful.

--- Query 1: High Land Concentration & Persistent Food Insecurity ---

Results:
                                             country  census_year  land_concentration_percent  latest_undernourishment  previous_undernourishment  latest_severe_insecurity  previous_severe_insecurity  undernourishment_concern  severe_insecurity_concern
                                           Australia         2015                       99.78                      0.0                        0.0                       4.2                         3.4                     False                       True
                                        South Africa         2017                       99.01                      8.1                        7.7                       8.4                         8.4                      True                       True
                                             Iceland         2010                       98.64      