# Green Purchasing Behavior Cube - NoSQL Project

## Project Overview
This project implements a custom JSON parser and NoSQL database operations to analyze the relationship between consumer spending on sustainable foods and economic factors like income and jobs.

## Team: Individual Project (Shamik Basu)


## Part 1: Extended JSON Parser

Extending the sample code to handle:
- Arrays
- Nested objects and arrays
- Boolean and null values
- Complex JSON structures


In [13]:
# Extended JSON Parser Implementation
# Based on sample code, extended to handle arrays, nested structures, booleans, and null

import re

def parse_string(str):
    """Parse a string value from JSON"""
    str = str.lstrip()
    assert(str[0] == '"'), f"Expected '\"' but found '{str[0]}'"
    str = str[1:]  # skip the start quote
    
    # Handle escaped characters (basic support)
    mystr = ""
    i = 0
    while i < len(str):
        if str[i] == '\\' and i + 1 < len(str):
            # Handle escape sequences
            if str[i+1] == 'n':
                mystr += '\n'
                i += 2
            elif str[i+1] == 't':
                mystr += '\t'
                i += 2
            elif str[i+1] == '\\':
                mystr += '\\'
                i += 2
            elif str[i+1] == '"':
                mystr += '"'
                i += 2
            else:
                mystr += str[i]
                i += 1
        elif str[i] == '"':
            # End of string
            rest = str[i + 1:]
            return mystr, rest
        else:
            mystr += str[i]
            i += 1
    
    raise ValueError('Unterminated string')

def parse_number(str):
    """Parse a number (int or float) from JSON"""
    str = str.lstrip()
    
    chs = ''
    is_float = False
    i = 0
    for ch in str:
        if (ch.isdigit() or ch == '.' or ch == '-' or ch == '+' or ch == 'e' or ch == 'E'):
            if ch == '.':
                is_float = True
            chs += ch
            i += 1
        else:
            break
    
    if len(chs) == 0:
        raise ValueError('Expected number but found nothing')
    
    str = str[i:]
    value = float(chs) if is_float else int(chs)
    return value, str

def parse_boolean(str):
    """Parse boolean values (true/false)"""
    str = str.lstrip()
    if str.startswith('true'):
        return True, str[4:]
    elif str.startswith('false'):
        return False, str[5:]
    else:
        raise ValueError('Expected boolean but found something else')

def parse_null(str):
    """Parse null value"""
    str = str.lstrip()
    if str.startswith('null'):
        return None, str[4:]
    else:
        raise ValueError('Expected null but found something else')

def parse_colon(str):
    """Consume a colon ':'"""
    str = str.lstrip()
    assert(str[0] == ':'), f"Expected ':' but found '{str[0]}'"
    return str[1:]

def parse_value(str):
    """Parse any JSON value (object, array, string, number, boolean, null)"""
    str = str.lstrip()
    
    if len(str) == 0:
        raise ValueError('Unexpected end of string')
    
    if str[0] == '{':
        return parse_object(str)
    elif str[0] == '[':
        return parse_array(str)
    elif str[0] == '"':
        return parse_string(str)
    elif str[0] == '-' or str[0].isdigit():
        return parse_number(str)
    elif str.startswith('true') or str.startswith('false'):
        return parse_boolean(str)
    elif str.startswith('null'):
        return parse_null(str)
    else:
        raise ValueError(f'Unexpected character: {str[0]}')

def parse_object(str):
    """Parse a JSON object (dictionary) - extended to handle nested structures"""
    str = str.lstrip()
    assert(str[0] == '{'), f"Expected '{{' but found '{str[0]}'"
    str = str[1:]  # skip {
    
    obj = {}
    
    while True:
        str = str.lstrip()
        
        if len(str) == 0:
            raise ValueError('Expecting "}" but reached the end of string!')
        elif str[0] == '}':  # end of json object
            str = str[1:]  # consume '}'
            return obj, str
        elif str[0] == ',':
            str = str[1:]  # skip ','
        else:  # ready for a new key-value pair
            key, str = parse_string(str)
            str = parse_colon(str)  # skip colon
            value, str = parse_value(str)  # parse any type of value
            obj[key] = value

def parse_array(str):
    """Parse a JSON array (list) - handles nested structures"""
    str = str.lstrip()
    assert(str[0] == '['), f"Expected '[' but found '{str[0]}'"
    str = str[1:]  # skip [
    
    arr = []
    
    while True:
        str = str.lstrip()
        
        if len(str) == 0:
            raise ValueError('Expecting "]" but reached the end of string!')
        elif str[0] == ']':  # end of array
            str = str[1:]  # consume ']'
            return arr, str
        elif str[0] == ',':
            str = str[1:]  # skip ','
        else:  # ready for a new value
            value, str = parse_value(str)  # parse any type of value
            arr.append(value)

def json_load(json_str):
    """Main function to load JSON string into Python object"""
    json_str = json_str.strip()
    value, rest = parse_value(json_str)
    rest = rest.strip()
    if len(rest) > 0:
        raise ValueError(f'Unexpected content after JSON: {rest[:20]}')
    return value

# Test the extended parser
print("Testing Extended JSON Parser:")
print("=" * 50)

# Test 1: Simple object
test1 = '{"name": "john", "age": 25.3, "gender": "male"}'
result1 = json_load(test1)
print("Test 1 - Simple object:", result1)

# Test 2: Nested object
test2 = '{"person": {"name": "john", "age": 25}, "city": "LA"}'
result2 = json_load(test2)
print("Test 2 - Nested object:", result2)

# Test 3: Array
test3 = '[1, 2, 3, "hello", true, null]'
result3 = json_load(test3)
print("Test 3 - Array:", result3)

# Test 4: Array of objects
test4 = '[{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]'
result4 = json_load(test4)
print("Test 4 - Array of objects:", result4)

# Test 5: Complex nested structure
test5 = '{"data": [{"county": "LA", "spend": 1000}, {"county": "NY", "spend": 2000}], "year": 2023}'
result5 = json_load(test5)
print("Test 5 - Complex nested:", result5)


Testing Extended JSON Parser:
Test 1 - Simple object: {'name': 'john', 'age': 25.3, 'gender': 'male'}
Test 2 - Nested object: {'person': {'name': 'john', 'age': 25}, 'city': 'LA'}
Test 3 - Array: [1, 2, 3, 'hello', True, None]
Test 4 - Array of objects: [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
Test 5 - Complex nested: {'data': [{'county': 'LA', 'spend': 1000}, {'county': 'NY', 'spend': 2000}], 'year': 2023}


## Part 2: Collection/DataFrame Structure

Implementing a collection structure to store JSON documents (similar to MongoDB collections)


In [14]:
# Collection class to store JSON documents (similar to MongoDB collections)

class Collection:
    """A collection class to store and manipulate JSON documents"""
    
    def __init__(self, name):
        self.name = name
        self.documents = []  # List of dictionaries (JSON objects)
    
    def insert(self, document):
        """Insert a document (dictionary) into the collection"""
        if isinstance(document, dict):
            self.documents.append(document)
        else:
            raise TypeError("Document must be a dictionary")
    
    def insert_many(self, documents):
        """Insert multiple documents into the collection"""
        for doc in documents:
            self.insert(doc)
    
    def __len__(self):
        return len(self.documents)
    
    def __getitem__(self, index):
        return self.documents[index]
    
    def __iter__(self):
        return iter(self.documents)
    
    def __repr__(self):
        return f"Collection(name='{self.name}', documents={len(self.documents)})"
    
    def to_list(self):
        """Return all documents as a list"""
        return self.documents.copy()

# Test Collection
print("Testing Collection Class:")
print("=" * 50)

collection = Collection("test_collection")
collection.insert({"name": "Alice", "age": 30})
collection.insert({"name": "Bob", "age": 25})
print(f"Collection: {collection}")
print(f"Documents: {collection.to_list()}")


Testing Collection Class:
Collection: Collection(name='test_collection', documents=2)
Documents: [{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}]


## Part 3: Core Operations

Implementing filtering, projection, group by, aggregation, and join operations


In [15]:
# Operation 1: Filtering
def filter_collection(collection, condition_func):
    """
    Filter documents in a collection based on a condition function
    
    Args:
        collection: Collection object
        condition_func: Function that takes a document and returns True/False
    
    Returns:
        New Collection with filtered documents
    """
    filtered = Collection(f"{collection.name}_filtered")
    for doc in collection:
        if condition_func(doc):
            filtered.insert(doc.copy())
    return filtered

# Operation 2: Projection
def project_collection(collection, fields):
    """
    Project (select) specific fields from documents
    
    Args:
        collection: Collection object
        fields: List of field names to select
    
    Returns:
        New Collection with projected documents
    """
    projected = Collection(f"{collection.name}_projected")
    for doc in collection:
        new_doc = {}
        for field in fields:
            if field in doc:
                new_doc[field] = doc[field]
        projected.insert(new_doc)
    return projected

# Operation 3: Group By
def group_by(collection, group_key):
    """
    Group documents by a key
    
    Args:
        collection: Collection object
        group_key: Field name to group by
    
    Returns:
        Dictionary where keys are group values and values are lists of documents
    """
    groups = {}
    for doc in collection:
        if group_key in doc:
            key_value = doc[group_key]
            if key_value not in groups:
                groups[key_value] = []
            groups[key_value].append(doc)
    return groups

# Operation 4: Aggregation
def aggregate(collection, group_key, agg_field, agg_func):
    """
    Group by a key and apply an aggregation function to a field
    
    Args:
        collection: Collection object
        group_key: Field name to group by
        agg_field: Field name to aggregate
        agg_func: Aggregation function (e.g., 'sum', 'avg', 'max', 'min', 'count')
    
    Returns:
        List of dictionaries with group_key and aggregated value
    """
    groups = group_by(collection, group_key)
    results = []
    
    for key_value, docs in groups.items():
        values = [doc[agg_field] for doc in docs if agg_field in doc]
        
        if len(values) == 0:
            continue
            
        if agg_func == 'sum':
            agg_value = sum(values)
        elif agg_func == 'avg':
            agg_value = sum(values) / len(values)
        elif agg_func == 'max':
            agg_value = max(values)
        elif agg_func == 'min':
            agg_value = min(values)
        elif agg_func == 'count':
            agg_value = len(values)
        else:
            raise ValueError(f"Unknown aggregation function: {agg_func}")
        
        results.append({group_key: key_value, f"{agg_func}({agg_field})": agg_value})
    
    return results

# Operation 5: Join
def join_collections(collection1, collection2, key1, key2):
    """
    Join two collections on specified keys
    
    Args:
        collection1: First Collection object
        collection2: Second Collection object
        key1: Key in collection1 to join on
        key2: Key in collection2 to join on
    
    Returns:
        New Collection with joined documents
    """
    joined = Collection(f"{collection1.name}_join_{collection2.name}")
    
    # Build index on collection2 for faster lookup
    index = {}
    for doc2 in collection2:
        if key2 in doc2:
            key_value = doc2[key2]
            if key_value not in index:
                index[key_value] = []
            index[key_value].append(doc2)
    
    # Perform join
    for doc1 in collection1:
        if key1 in doc1:
            key_value = doc1[key1]
            if key_value in index:
                for doc2 in index[key_value]:
                    # Merge documents
                    merged = doc1.copy()
                    # Add fields from doc2, avoiding conflicts by prefixing
                    for k, v in doc2.items():
                        if k != key2:  # Don't duplicate the join key
                            if k in merged:
                                merged[f"{collection2.name}_{k}"] = v
                            else:
                                merged[k] = v
                    joined.insert(merged)
    
    return joined

# Test operations
print("Testing Core Operations:")
print("=" * 50)

# Create test collection
test_coll = Collection("test")
test_coll.insert_many([
    {"county": "LA", "spend": 1000, "year": 2023},
    {"county": "NY", "spend": 2000, "year": 2023},
    {"county": "LA", "spend": 1500, "year": 2024},
    {"county": "NY", "spend": 2500, "year": 2024},
])

print("Original collection:")
for doc in test_coll:
    print(f"  {doc}")

# Test filtering
print("\n1. Filtering (spend > 1500):")
filtered = filter_collection(test_coll, lambda doc: doc.get("spend", 0) > 1500)
for doc in filtered:
    print(f"  {doc}")

# Test projection
print("\n2. Projection (county, spend):")
projected = project_collection(test_coll, ["county", "spend"])
for doc in projected:
    print(f"  {doc}")

# Test group by
print("\n3. Group by county:")
groups = group_by(test_coll, "county")
for key, docs in groups.items():
    print(f"  {key}: {len(docs)} documents")

# Test aggregation
print("\n4. Aggregation (sum of spend by county):")
agg_result = aggregate(test_coll, "county", "spend", "sum")
for result in agg_result:
    print(f"  {result}")

# Test join
print("\n5. Join:")
coll1 = Collection("coll1")
coll1.insert_many([
    {"county": "LA", "population": 10000000},
    {"county": "NY", "population": 8000000},
])

coll2 = Collection("coll2")
coll2.insert_many([
    {"county_code": "LA", "unemployment": 5.2},
    {"county_code": "NY", "unemployment": 4.8},
])

joined = join_collections(coll1, coll2, "county", "county_code")
for doc in joined:
    print(f"  {doc}")


Testing Core Operations:
Original collection:
  {'county': 'LA', 'spend': 1000, 'year': 2023}
  {'county': 'NY', 'spend': 2000, 'year': 2023}
  {'county': 'LA', 'spend': 1500, 'year': 2024}
  {'county': 'NY', 'spend': 2500, 'year': 2024}

1. Filtering (spend > 1500):
  {'county': 'NY', 'spend': 2000, 'year': 2023}
  {'county': 'NY', 'spend': 2500, 'year': 2024}

2. Projection (county, spend):
  {'county': 'LA', 'spend': 1000}
  {'county': 'NY', 'spend': 2000}
  {'county': 'LA', 'spend': 1500}
  {'county': 'NY', 'spend': 2500}

3. Group by county:
  LA: 2 documents
  NY: 2 documents

4. Aggregation (sum of spend by county):
  {'county': 'LA', 'sum(spend)': 2500}
  {'county': 'NY', 'sum(spend)': 4500}

5. Join:
  {'county': 'LA', 'population': 10000000, 'unemployment': 5.2}
  {'county': 'NY', 'population': 8000000, 'unemployment': 4.8}


## Part 4: JSON File Loading

Function to load JSON files (arrays of objects) into collections


In [16]:
# Function to load JSON file into a collection
def load_json_file(filename):
    """
    Load a JSON file (array of objects) into a Collection
    
    Args:
        filename: Path to JSON file
    
    Returns:
        Collection object
    """
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            content = f.read().strip()
        
        # Parse JSON
        data = json_load(content)
        
        # Create collection (handle both Windows and Unix paths)
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        collection = Collection(collection_name)
        
        # Handle both array of objects and single object
        if isinstance(data, list):
            for doc in data:
                if isinstance(doc, dict):
                    collection.insert(doc)
        elif isinstance(data, dict):
            collection.insert(data)
        else:
            raise ValueError("JSON file must contain an object or array of objects")
        
        return collection
    except FileNotFoundError:
        print(f"File {filename} not found. Creating empty collection.")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)
    except Exception as e:
        print(f"Error loading {filename}: {e}")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)

# Function to parse CSV file into a collection
def parse_csv_line(line):
    """Parse a single CSV line, handling quoted fields"""
    fields = []
    current_field = ""
    in_quotes = False
    i = 0
    
    while i < len(line):
        char = line[i]
        
        if char == '"':
            if in_quotes and i + 1 < len(line) and line[i + 1] == '"':
                # Escaped quote
                current_field += '"'
                i += 2
            else:
                # Toggle quote state
                in_quotes = not in_quotes
                i += 1
        elif char == ',' and not in_quotes:
            # End of field
            fields.append(current_field)
            current_field = ""
            i += 1
        else:
            current_field += char
            i += 1
    
    # Add last field
    fields.append(current_field)
    return fields

def load_csv_file(filename):
    """
    Load a CSV file into a Collection
    
    Args:
        filename: Path to CSV file
    
    Returns:
        Collection object
    """
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        
        if len(lines) == 0:
            raise ValueError("CSV file is empty")
        
        # Parse header
        header = parse_csv_line(lines[0].strip())
        
        # Create collection
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        collection = Collection(collection_name)
        
        # Parse data rows
        for line in lines[1:]:
            line = line.strip()
            if not line:  # Skip empty lines
                continue
            
            fields = parse_csv_line(line)
            if len(fields) != len(header):
                # Skip malformed rows
                continue
            
            # Create document
            doc = {}
            for i, field in enumerate(fields):
                # Try to convert to number if possible
                field = field.strip()
                if field == '' or field == 'N/A' or field == '-9999' or field == '-8888':
                    doc[header[i]] = None
                else:
                    try:
                        # Try integer first
                        if '.' in field:
                            doc[header[i]] = float(field)
                        else:
                            doc[header[i]] = int(field)
                    except ValueError:
                        # Keep as string
                        doc[header[i]] = field
            
            collection.insert(doc)
        
        return collection
    except FileNotFoundError:
        print(f"File {filename} not found. Creating empty collection.")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)
    except Exception as e:
        print(f"Error loading {filename}: {e}")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)

# Function to load Excel file into a collection
def load_excel_file(filename, sheet_name=None, header_row=None):
    """
    Load an Excel file into a Collection using openpyxl
    
    Args:
        filename: Path to Excel file
        sheet_name: Name of sheet to load (None for first sheet)
        header_row: Row number containing headers (0-indexed). If None, auto-detects the first non-empty row.
    
    Returns:
        Collection object
    """
    try:
        # Try to import openpyxl
        try:
            from openpyxl import load_workbook
        except ImportError:
            print("Warning: openpyxl not available. Install with: pip install openpyxl")
            print(f"Creating empty collection for {filename}")
            collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
            return Collection(collection_name)
        
        # Load workbook
        wb = load_workbook(filename, data_only=True)
        
        # Get sheet
        if sheet_name:
            ws = wb[sheet_name]
        else:
            ws = wb.active
        
        # Determine header row (auto-detect if not provided)
        headers = None
        current_row_index = -1
        
        def row_has_data(row):
            return any(cell is not None and str(cell).strip() != "" for cell in row)
        
        # Create collection
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        collection = Collection(collection_name)
        
        # Parse rows
        for row in ws.iter_rows(values_only=True):
            current_row_index += 1
            
            # Skip rows before the specified header_row
            if header_row is not None and current_row_index < header_row:
                continue
            
            if headers is None:
                if header_row is None and not row_has_data(row):
                    continue  # still searching for header row
                headers = [str(cell).strip() if cell is not None else "" for cell in row]
                continue  # move to next row for data
            
            # Skip empty rows
            if all(cell is None or (isinstance(cell, str) and cell.strip() == '') for cell in row):
                continue
            
            # Create document
            doc = {}
            for i, cell_value in enumerate(row):
                if i >= len(headers):
                    continue
                field_name = headers[i].strip()
                if not field_name:
                    continue
                
                # Convert cell value
                if cell_value is None or cell_value == '':
                    doc[field_name] = None
                elif isinstance(cell_value, (int, float)):
                    doc[field_name] = cell_value
                else:
                    # Try to convert string to number
                    cell_str = str(cell_value).strip()
                    if cell_str == '' or cell_str == 'N/A' or cell_str == '-9999' or cell_str == '-8888':
                        doc[field_name] = None
                    else:
                        try:
                            if '.' in cell_str:
                                doc[field_name] = float(cell_str)
                            else:
                                doc[field_name] = int(cell_str)
                        except ValueError:
                            doc[field_name] = cell_str
            
            if doc:  # Only insert non-empty documents
                collection.insert(doc)
        
        return collection
    except FileNotFoundError:
        print(f"File {filename} not found. Creating empty collection.")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)
    except Exception as e:
        print(f"Error loading {filename}: {e}")
        collection_name = filename.replace('\\', '/').split('/')[-1].split('.')[0]
        return Collection(collection_name)

# Test file loading
print("Testing File Loading:")
print("=" * 50)


Testing File Loading:


## Part 5: Sample Data Generation

Creating sample datasets for Green Purchasing Behavior analysis


In [None]:
# Transform Food Environment Atlas data for Green Purchasing Behavior analysis

def transform_food_data(food_data_collection):
    """
    Transform Food Environment Atlas data into format suitable for green purchasing analysis.
    Focus on variables related to local foods, farmers markets, and direct sales.
    
    Args:
        food_data_collection: Collection with Food Environment Atlas data
    
    Returns:
        Collection with transformed data focused on green purchasing indicators
    """
    transformed = Collection("food_spending")
    
    # Variables related to green/sustainable purchasing behavior
    green_variables = [
        'DIRSALES17',  # Direct farm sales, 2017
        'DIRSALES12',  # Direct farm sales, 2012
        'FMRKT18',     # Farmers' markets, 2018
        'FMRKT13',     # Farmers' markets, 2013
        'VEG_ACRES17', # Vegetable acres harvested, 2017
        'VEG_ACRES12', # Vegetable acres harvested, 2012
        'FRESHVEG_ACRES17',  # Fresh vegetable acres, 2017
        'FRESHVEG_ACRES12',  # Fresh vegetable acres, 2012
        'GROC20',      # Grocery stores, 2020
        'GROC16',      # Grocery stores, 2016
        'SPECS20',     # Specialized food stores, 2020
        'SPECS16',     # Specialized food stores, 2016
    ]
    
    # Group data by county and variable
    county_vars = {}
    for doc in food_data_collection:
        county = doc.get('County')
        var_code = doc.get('Variable_Code')
        value = doc.get('Value')
        
        if county and var_code and var_code in green_variables and value is not None:
            if county not in county_vars:
                county_vars[county] = {}
            county_vars[county][var_code] = value
    
    # Create transformed documents
    for county, vars_dict in county_vars.items():
        # Create documents for different categories
        if 'DIRSALES17' in vars_dict:
            # Direct sales as proxy for local/organic food spending
            transformed.insert({
                "county": county,
                "category": "direct_sales",
                "spend": vars_dict.get('DIRSALES17', 0) * 1000,  # Convert from thousands
                "year": 2017
            })
        
        if 'FMRKT18' in vars_dict:
            # Farmers markets as indicator of green purchasing
            transformed.insert({
                "county": county,
                "category": "farmers_markets",
                "spend": vars_dict.get('FMRKT18', 0) * 100,  # Scale for analysis
                "year": 2018
            })
        
        if 'FRESHVEG_ACRES17' in vars_dict:
            # Fresh vegetable production as green indicator
            transformed.insert({
                "county": county,
                "category": "fresh_vegetables",
                "spend": vars_dict.get('FRESHVEG_ACRES17', 0) * 10,  # Scale for analysis
                "year": 2017
            })
        
        if 'SPECS20' in vars_dict:
            # Specialized food stores (often organic/natural)
            transformed.insert({
                "county": county,
                "category": "specialized_stores",
                "spend": vars_dict.get('SPECS20', 0) * 50,  # Scale for analysis
                "year": 2020
            })
    
    return transformed

def transform_socioeconomic_data(food_data_collection):
    """
    Extract socioeconomic data (income, poverty) from Food Environment Atlas
    
    Args:
        food_data_collection: Collection with Food Environment Atlas data
    
    Returns:
        Collection with socioeconomic data
    """
    jobs = Collection("jobs")
    
    # Variables for income/jobs
    income_vars = ['MEDHHINC21']  # Median household income, 2021
    
    county_income = {}
    for doc in food_data_collection:
        county = doc.get('County')
        var_code = doc.get('Variable_Code')
        value = doc.get('Value')
        
        if county and var_code == 'MEDHHINC21' and value is not None:
            county_income[county] = value
    
    # Create job/income documents
    for county, income in county_income.items():
        jobs.insert({
            "county": county,
            "occupation": "general",
            "median_income": int(income) if income else None,
            "year": 2021
        })
    
    return jobs

def transform_unemployment_data(food_data_collection):
    """
    Extract unemployment/poverty data from Food Environment Atlas
    
    Args:
        food_data_collection: Collection with Food Environment Atlas data
    
    Returns:
        Collection with unemployment/poverty data
    """
    unemployment = Collection("unemployment")
    
    # Variables for economic indicators
    poverty_vars = ['POVRATE21']  # Poverty rate, 2021
    
    county_poverty = {}
    for doc in food_data_collection:
        county = doc.get('County')
        var_code = doc.get('Variable_Code')
        value = doc.get('Value')
        
        if county and var_code == 'POVRATE21' and value is not None:
            county_poverty[county] = value
    
    # Create unemployment documents (using poverty rate as proxy)
    for county, poverty_rate in county_poverty.items():
        unemployment.insert({
            "county": county,
            "rate": float(poverty_rate) if poverty_rate else None,
            "year": 2021
        })
    
    return unemployment

def transform_employment_data(food_data_collection):
    """
    Extract and transform employment-related data from Food Environment Atlas.
    Since the Atlas doesn't have direct employment data, we use income and economic indicators
    as proxies for employment levels.
    
    Args:
        food_data_collection: Collection with Food Environment Atlas data
    
    Returns:
        Collection with employment data
    """
    employment = Collection("employment")
    
    # Extract income and poverty data to create employment indicators
    county_data = {}
    for doc in food_data_collection:
        county = doc.get('County')
        var_code = doc.get('Variable_Code')
        value = doc.get('Value')
        
        if county and value is not None:
            if county not in county_data:
                county_data[county] = {}
            
            if var_code == 'MEDHHINC21':
                county_data[county]['median_income'] = value
            elif var_code == 'POVRATE21':
                county_data[county]['poverty_rate'] = value
            elif var_code == 'METRO23':
                county_data[county]['metro'] = value
    
    # Create employment documents with employment categories based on income
    for county, data in county_data.items():
        median_income = data.get('median_income')
        poverty_rate = data.get('poverty_rate')
        metro = data.get('metro', 'Nonmetro')
        
        if median_income is not None:
            # Categorize employment level based on income
            # Higher income typically correlates with better employment opportunities
            if median_income >= 75000:
                employment_level = "high"
                occupation = "professional"
            elif median_income >= 50000:
                employment_level = "medium_high"
                occupation = "skilled"
            elif median_income >= 35000:
                employment_level = "medium"
                occupation = "service"
            else:
                employment_level = "low"
                occupation = "retail"
            
            # Calculate employment rate proxy (100 - poverty rate, adjusted)
            # This is a simplified proxy since we don't have actual employment data
            if poverty_rate is not None:
                # Employment rate proxy: lower poverty = higher employment
                employment_rate = max(0, min(100, 100 - poverty_rate * 1.5))
            else:
                # Estimate based on income if poverty rate not available
                employment_rate = min(95, max(60, (median_income / 1000) * 0.8))
            
            employment.insert({
                "county": county,
                "occupation": occupation,
                "median_income": int(median_income) if median_income else None,
                "employment_level": employment_level,
                "employment_rate": round(employment_rate, 2),
                "poverty_rate": float(poverty_rate) if poverty_rate else None,
                "metro_status": metro,
                "year": 2021
            })
    
    return employment

print("Data transformation functions created for Food Environment Atlas data")

# Transform Excel data files for Green Purchasing Behavior analysis

# Helper utilities for Excel -> Collection transformations
def get_field_value(doc, keywords):
    for key, value in doc.items():
        if key is None:
            continue
        key_lower = str(key).strip().lower()
        for keyword in keywords:
            if keyword in key_lower:
                if isinstance(value, str):
                    return value.strip()
                return value
    return None

def to_float(value):
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return float(value)
    value_str = str(value).strip()
    if value_str == "":
        return None
    value_str = value_str.replace(',', '')
    if value_str.endswith('%'):
        value_str = value_str[:-1]
    try:
        return float(value_str)
    except ValueError:
        return None

def to_int(value):
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return int(value)
    value_str = str(value).strip()
    if value_str == "":
        return None
    value_str = value_str.replace(',', '')
    match = re.search(r'-?\d+', value_str)
    if match:
        try:
            return int(match.group(0))
        except ValueError:
            return None
    return None

def infer_year(value, default_year):
    if value is None:
        return default_year
    if isinstance(value, (int, float)):
        year = int(value)
        if 1900 <= year <= 2100:
            return year
    value_str = str(value)
    match = re.search(r'(19|20)\d{2}', value_str)
    if match:
        return int(match.group(0))
    return default_year

def transform_consumer_data(consumer_collection):
    """
    Transform consumer spending data from Excel into food spending format
    """
    food_spending = Collection("food_spending")
    county_keywords = ['county', 'area', 'region', 'location', 'metro', 'city', 'borough']
    category_keywords = ['category', 'item', 'product', 'series', 'description', 'class']
    spend_keywords = ['spend', 'expenditure', 'value', 'amount', 'dollar', 'sales', 'cost', 'price']
    year_keywords = ['year', 'date', 'period', 'month', 'time']
    
    for doc in consumer_collection:
        county = get_field_value(doc, county_keywords)
        spend = to_float(get_field_value(doc, spend_keywords))
        if not county or spend is None:
            continue
        category = get_field_value(doc, category_keywords) or "general"
        year = infer_year(get_field_value(doc, year_keywords), 2023)
        
        food_spending.insert({
            "county": str(county),
            "category": str(category).strip().lower().replace(' ', '_'),
            "spend": spend,
            "year": year
        })
    
    return food_spending

def transform_employment_excel_data(employment_collection):
    """
    Transform employment data from Excel into jobs/employment format
    """
    jobs = Collection("jobs")
    county_keywords = ['county', 'area', 'region', 'location', 'metro', 'city', 'borough']
    income_keywords = ['median_income', 'income', 'wage', 'salary', 'earnings', 'pay']
    occupation_keywords = ['occupation', 'job', 'title', 'category', 'sector', 'industry', 'class']
    year_keywords = ['year', 'date', 'period', 'month']
    
    for doc in employment_collection:
        county = get_field_value(doc, county_keywords)
        if not county:
            continue
        median_income = to_int(get_field_value(doc, income_keywords))
        occupation = get_field_value(doc, occupation_keywords) or "general"
        year = infer_year(get_field_value(doc, year_keywords), 2024)
        
        jobs.insert({
            "county": str(county),
            "occupation": str(occupation).strip().lower().replace(' ', '_'),
            "median_income": median_income,
            "year": year
        })
    
    return jobs

def transform_unemployment_excel_data(unemployment_collection):
    """
    Transform unemployment data from Excel into unemployment format
    """
    unemployment = Collection("unemployment")
    county_keywords = ['county', 'area', 'region', 'location', 'metro', 'city', 'borough']
    rate_keywords = ['unemployment', 'jobless', 'rate', 'percent', 'pct', 'labor', 'employment_rate']
    year_keywords = ['year', 'date', 'period', 'month']
    
    for doc in unemployment_collection:
        county = get_field_value(doc, county_keywords)
        rate = to_float(get_field_value(doc, rate_keywords))
        if not county or rate is None:
            continue
        year = infer_year(get_field_value(doc, year_keywords), 2024)
        
        unemployment.insert({
            "county": str(county),
            "rate": rate,
            "year": year
        })
    
    return unemployment

print("Excel data transformation functions created")


SyntaxError: invalid syntax (3491599650.py, line 233)

## Part 6: Green Purchasing Behavior Application

Application that uses all implemented functions to analyze sustainable food purchasing behavior


In [None]:
# Green Purchasing Behavior Analysis Application

print("=" * 70)
print("GREEN PURCHASING BEHAVIOR ANALYSIS APPLICATION")
print("=" * 70)

# Load data from Excel files
print("\n1. Loading Data from Excel Files:")
print("-" * 70)

# Load consumer spending data
print("Loading consumer spending data...")
consumer_data_raw = load_excel_file('consumer-data/cu-all-detail-2023.xlsx')
print(f"Loaded {len(consumer_data_raw)} records from consumer data")

# Load employment data
print("Loading employment data...")
employment_data_raw = load_excel_file('employement_data/all_data_M_2024.xlsx')
print(f"Loaded {len(employment_data_raw)} records from employment data")

# Load unemployment data
print("Loading unemployment data...")
unemployment_data_raw = load_excel_file('unemployment-rate-data/metro-annual-unemployment-rates.xlsx')
print(f"Loaded {len(unemployment_data_raw)} records from unemployment data")

# Load Food Environment Atlas data (for supplementary analysis)
print("\nLoading Food Environment Atlas data (supplementary)...")
food_data_raw = load_csv_file('food_data/StateAndCountyData.csv')
print(f"Loaded {len(food_data_raw)} records from Food Environment Atlas")

# Transform Excel data for analysis
print("\n2. Transforming Data for Green Purchasing Analysis:")
print("-" * 70)

# Transform consumer data to food spending format
food_spending = transform_consumer_data(consumer_data_raw)
print(f"Transformed to {len(food_spending)} food spending records from consumer data")

# Transform employment data
jobs = transform_employment_excel_data(employment_data_raw)
print(f"Transformed to {len(jobs)} job/employment records from employment data")

# Transform unemployment data
unemployment = transform_unemployment_excel_data(unemployment_data_raw)
print(f"Transformed to {len(unemployment)} unemployment records from unemployment data")

# Also create employment data from Food Environment Atlas (for additional insights)
employment = transform_employment_data(food_data_raw)
print(f"Transformed to {len(employment)} employment records from Food Environment Atlas")

# Application Question 1: Who is buying sustainable food? (Demographics, Geography)
print("\n3. Question 1: Who is buying sustainable food? (Geography Analysis)")
print("-" * 70)

# Filter: High spending counties (spend > 1000)
print("\n2a. Filtering: Counties with spending > $1000")
high_spending = filter_collection(food_spending, lambda doc: doc.get("spend", 0) > 1000)
print(f"Found {len(high_spending)} records with spending > $1000")

# Projection: County and spend
print("\n2b. Projection: County and spending amounts")
county_spending = project_collection(high_spending, ["county", "spend"])
for doc in county_spending:
    print(f"  {doc['county']}: ${doc['spend']:.2f}")

# Group by: County
print("\n2c. Group By: Spending by county")
county_groups = group_by(food_spending, "county")
for county, docs in county_groups.items():
    total = sum(doc.get("spend", 0) for doc in docs)
    print(f"  {county}: ${total:.2f} total spending ({len(docs)} records)")

# Aggregation: Total spending by county
print("\n2d. Aggregation: Total spending by county")
total_by_county = aggregate(food_spending, "county", "spend", "sum")
for result in total_by_county:
    print(f"  {result['county']}: ${result['sum(spend)']:.2f}")

# Application Question 2: How income influences sustainable purchasing
print("\n4. Question 2: How income influences sustainable purchasing behavior")
print("-" * 70)

# Join: Food spending with jobs data
print("\n4a. Join: Food spending with jobs data (on county)")
spending_jobs = join_collections(food_spending, jobs, "county", "county")
print(f"Joined collection has {len(spending_jobs)} records")

# Join: Food spending with employment data
print("\n4a2. Join: Food spending with employment data (on county)")
spending_employment = join_collections(food_spending, employment, "county", "county")
print(f"Joined collection has {len(spending_employment)} records")

# Filter: High income areas (median_income > 40000)
print("\n4b. Filtering: High income areas (median_income > $40,000)")
high_income_spending = filter_collection(spending_jobs, 
                                        lambda doc: doc.get("median_income", 0) > 40000)
print(f"Found {len(high_income_spending)} records in high-income areas")

# Filter: High employment areas
print("\n4b2. Filtering: High employment areas (employment_rate > 85%)")
high_employment_spending = filter_collection(spending_employment, 
                                            lambda doc: doc.get("employment_rate", 0) > 85.0)
print(f"Found {len(high_employment_spending)} records in high-employment areas")

# Aggregation: Average spending by income level
print("\n4c. Aggregation: Average spending by income level")
# First, categorize income levels
def categorize_income(doc):
    income = doc.get("median_income", 0)
    if income >= 45000:
        return "high"
    elif income >= 40000:
        return "medium"
    else:
        return "low"

# Add income category to documents
spending_with_category = Collection("spending_categorized")
for doc in spending_jobs:
    new_doc = doc.copy()
    new_doc["income_category"] = categorize_income(doc)
    spending_with_category.insert(new_doc)

avg_by_income = aggregate(spending_with_category, "income_category", "spend", "avg")
print("Average spending by income category:")
for result in avg_by_income:
    print(f"  {result['income_category']}: ${result['avg(spend)']:.2f}")

# Aggregation: Average spending by employment level
print("\n4c2. Aggregation: Average spending by employment level")
avg_by_employment = aggregate(spending_employment, "employment_level", "spend", "avg")
print("Average spending by employment level:")
for result in avg_by_employment:
    print(f"  {result['employment_level']}: ${result['avg(spend)']:.2f}")

# Aggregation: Average spending by occupation type
print("\n4c3. Aggregation: Average spending by occupation type")
avg_by_occupation = aggregate(spending_employment, "occupation", "spend", "avg")
print("Average spending by occupation type:")
for result in avg_by_occupation:
    print(f"  {result['occupation']}: ${result['avg(spend)']:.2f}")

# Aggregation: Average spending by metro status
print("\n4c4. Aggregation: Average spending by metro/nonmetro status")
avg_by_metro = aggregate(spending_employment, "metro_status", "spend", "avg")
print("Average spending by metro status:")
for result in avg_by_metro:
    print(f"  {result['metro_status']}: ${result['avg(spend)']:.2f}")

# Application Question 3: Economic shocks and spending habits
print("\n5. Question 3: Do economic shocks (poverty) change spending habits?")
print("-" * 70)

# Join: Food spending with unemployment data
print("\n4a. Join: Food spending with unemployment data")
spending_unemployment = join_collections(food_spending, unemployment, "county", "county")
print(f"Joined collection has {len(spending_unemployment)} records")

# Filter: High poverty areas (rate > 15%)
print("\n5b. Filtering: High poverty areas (rate > 15%)")
high_unemployment = filter_collection(spending_unemployment, 
                                     lambda doc: doc.get("rate", 0) > 15.0)
print(f"Found {len(high_unemployment)} records in high-poverty areas")

# Aggregation: Average spending by poverty level
print("\n5c. Aggregation: Spending comparison by poverty level")
def categorize_unemployment(doc):
    rate = doc.get("rate", 0)
    if rate >= 20.0:
        return "very_high"
    elif rate >= 15.0:
        return "high"
    elif rate >= 10.0:
        return "medium"
    else:
        return "low"

spending_with_unemp_cat = Collection("spending_unemp_categorized")
for doc in spending_unemployment:
    new_doc = doc.copy()
    new_doc["unemployment_category"] = categorize_unemployment(doc)
    spending_with_unemp_cat.insert(new_doc)

avg_by_unemployment = aggregate(spending_with_unemp_cat, "unemployment_category", "spend", "avg")
print("Average spending by poverty category:")
for result in avg_by_unemployment:
    print(f"  {result['unemployment_category']}: ${result['avg(spend)']:.2f}")

# Year-over-year analysis
print("\n5d. Spending by year and poverty rate")
year_groups = group_by(spending_unemployment, "year")
for year, docs in sorted(year_groups.items()):
    avg_spend = sum(doc.get("spend", 0) for doc in docs) / len(docs) if docs else 0
    avg_poverty = sum(doc.get("rate", 0) for doc in docs) / len(docs) if docs else 0
    print(f"  {year}: Avg spending=${avg_spend:.2f}, Avg poverty rate={avg_poverty:.2f}%")

# Application Question 4: Employment and green purchasing relationship
print("\n6. Question 4: How does employment status affect green purchasing behavior?")
print("-" * 70)

# Join employment with food spending for detailed analysis
print("\n6a. Join: Employment data with food spending")
employment_food = join_collections(employment, food_spending, "county", "county")
print(f"Joined collection has {len(employment_food)} records")

# Filter: High employment rate areas
print("\n6b. Filtering: Counties with employment rate > 90%")
high_emp_areas = filter_collection(employment_food, 
                                   lambda doc: doc.get("employment_rate", 0) > 90.0)
print(f"Found {len(high_emp_areas)} records in high-employment areas")

# Aggregation: Spending by employment rate ranges
print("\n6c. Aggregation: Spending by employment rate ranges")
def categorize_employment_rate(doc):
    rate = doc.get("employment_rate", 0)
    if rate >= 90:
        return "very_high"
    elif rate >= 85:
        return "high"
    elif rate >= 75:
        return "medium"
    else:
        return "low"

spending_with_emp_rate = Collection("spending_emp_rate")
for doc in employment_food:
    new_doc = doc.copy()
    new_doc["emp_rate_category"] = categorize_employment_rate(doc)
    spending_with_emp_rate.insert(new_doc)

avg_by_emp_rate = aggregate(spending_with_emp_rate, "emp_rate_category", "spend", "avg")
print("Average spending by employment rate category:")
for result in avg_by_emp_rate:
    print(f"  {result['emp_rate_category']}: ${result['avg(spend)']:.2f}")

# Complex analysis: Employment level and food category
print("\n6d. Complex Analysis: Employment level by food category")
emp_category_groups = {}
for doc in employment_food:
    emp_level = doc.get("employment_level")
    category = doc.get("category")
    if emp_level and category:
        key = (emp_level, category)
        if key not in emp_category_groups:
            emp_category_groups[key] = []
        emp_category_groups[key].append(doc)

print("Top combinations of employment level and food category:")
sorted_emp_cat = sorted(emp_category_groups.items(), 
                        key=lambda x: sum(d.get("spend", 0) for d in x[1]), 
                        reverse=True)
for (emp_level, category), docs in sorted_emp_cat[:8]:
    total = sum(d.get("spend", 0) for d in docs)
    avg_emp_rate = sum(d.get("employment_rate", 0) for d in docs) / len(docs) if docs else 0
    print(f"  {emp_level} employment - {category}: ${total:.2f} (avg emp rate: {avg_emp_rate:.1f}%)")

# Application Question 5: Category analysis
print("\n7. Question 5: Spending patterns by food category")
print("-" * 70)

# Aggregation: Total spending by category
print("\n7a. Aggregation: Total spending by category")
total_by_category = aggregate(food_spending, "category", "spend", "sum")
for result in total_by_category:
    print(f"  {result['category']}: ${result['sum(spend)']:.2f}")

# Aggregation: Average spending by category
print("\n7b. Aggregation: Average spending by category")
avg_by_category = aggregate(food_spending, "category", "spend", "avg")
for result in avg_by_category:
    print(f"  {result['category']}: ${result['avg(spend)']:.2f}")

# Complex query: Category spending by county
print("\n7c. Complex Query: Category spending by county (using group by and aggregation)")
county_category_groups = {}
for doc in food_spending:
    county = doc.get("county")
    category = doc.get("category")
    key = (county, category)
    if key not in county_category_groups:
        county_category_groups[key] = []
    county_category_groups[key].append(doc)

print("Top spending combinations:")
sorted_combos = sorted(county_category_groups.items(), 
                      key=lambda x: sum(d.get("spend", 0) for d in x[1]), 
                      reverse=True)
for (county, category), docs in sorted_combos[:5]:
    total = sum(d.get("spend", 0) for d in docs)
    print(f"  {county} - {category}: ${total:.2f}")

print("\n" + "=" * 70)
print("APPLICATION ANALYSIS COMPLETE")
print("=" * 70)


GREEN PURCHASING BEHAVIOR ANALYSIS APPLICATION

1. Loading Data from Excel Files:
----------------------------------------------------------------------
Loading consumer spending data...
Loaded 4460 records from consumer data
Loading employment data...
Loaded 414437 records from employment data
Loading unemployment data...
Loaded 392 records from unemployment data

Loading Food Environment Atlas data (supplementary)...
Loaded 957753 records from Food Environment Atlas

2. Transforming Data for Green Purchasing Analysis:
----------------------------------------------------------------------
Transformed to 0 food spending records from consumer data
Transformed to 414437 job/employment records from employment data
Transformed to 0 unemployment records from unemployment data
Transformed to 1833 employment records from Food Environment Atlas

3. Question 1: Who is buying sustainable food? (Geography Analysis)
----------------------------------------------------------------------

2a. Filter

## Summary

This project implements:

1. **Extended JSON Parser**: Handles objects, arrays, nested structures, booleans, and null values
2. **CSV Parser**: Custom CSV parser that handles quoted fields and converts data types appropriately
3. **Excel Parser**: Custom Excel file parser using openpyxl to load .xlsx files into collections
4. **Collection Class**: Stores and manages JSON documents (similar to MongoDB collections)
5. **Core Operations**:
   - **Filtering**: Select documents based on conditions
   - **Projection**: Select specific fields from documents
   - **Group By**: Group documents by a key
   - **Aggregation**: Compute aggregates (sum, avg, max, min, count) on grouped data
   - **Join**: Join two collections on specified keys

6. **Data Transformation**: Functions to transform real-world data into format suitable for green purchasing analysis:
   - **Consumer Data**: Transforms consumer spending Excel data into food spending format
   - **Employment Data**: Transforms employment Excel data into jobs/employment format
   - **Unemployment Data**: Transforms unemployment Excel data into unemployment rate format
   - **Food Environment Atlas**: Extracts variables related to local foods, farmers markets, direct sales, and specialized stores
   - **Employment Indicators**: Creates employment data using income and economic indicators as proxies (employment levels, rates, occupation types, metro status)
   - Maps to green purchasing behavior indicators

7. **Application**: Green Purchasing Behavior analysis using real data from multiple sources:
   - **Consumer spending data** from Excel files (cu-all-detail-2023.xlsx)
   - **Employment data** from Excel files (all_data_M_2024.xlsx)
   - **Unemployment data** from Excel files (metro-annual-unemployment-rates.xlsx)
   - **Food Environment Atlas** data for supplementary analysis
   - Analyzes who buys sustainable food (geography)
   - Examines income influence on purchasing behavior
   - **Analyzes employment status impact** on green purchasing (employment levels, rates, occupation types, metro/nonmetro)
   - Studies economic shocks (unemployment/poverty) impact on spending
   - Analyzes spending patterns by food category

All operations are implemented from scratch without using pandas, json, or csv libraries. The project now uses real data from Excel files and the USDA Food Environment Atlas instead of synthetic sample data.
