# Testing Processing Functions for Apartment Data

This notebook serves as a standalone testing environment for the data processing functions developed for the streaming ETL application. It allows testing the processing functions without requiring external data files.

## Benefits of this Testing Notebook:

1. Test functionality independently from the main application
2. Understand how to integrate functions into the streaming ETL pipeline
3. Quick start for developers who want to use this processing logic in their own applications

## Processing Function Overview

The `process_row_final` function performs the following operations:
- Converts binary features from strings ('yes'/'no') to boolean values
- Fills missing values with sensible defaults
- Converts categorical features to numerical values
- Creates new features (floor_ratio, price_per_m2, comfort_score)
- Removes unnecessary columns
- Normalizes numerical features to range [0-1]

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler

# Chart display settings
plt.style.use('seaborn-v0_8')
sns.set(font_scale=1.2)

# Display all DataFrame columns
pd.set_option('display.max_columns', None)

## Main Processing Function

Below is the implementation of the `process_row_final` function that transforms apartment data rows.

In [None]:
def process_row_final(row: pd.Series, normalize=True) -> pd.Series:
    """
    Final function for processing data rows.
    Includes all necessary transformations and optional normalization.
    
    Args:
        row (pd.Series): Row from DataFrame with apartment data
        normalize (bool): Flag indicating whether to normalize numerical features
        
    Returns:
        pd.Series: Processed row with transformed data
    """
    # Create a copy of the row
    processed = row.copy()
    
    # 1. Convert binary string features to boolean values
    binary_columns = ['hasParkingSpace', 'hasBalcony', 'hasElevator', 'hasSecurity', 'hasStorageRoom']
    for col in binary_columns:
        if col in processed:
            # Fill missing values
            if pd.isna(processed[col]):
                processed[col] = 'no'
                
                # For hasElevator, determine by building type and floor count
                if col == 'hasElevator' and 'floorCount' in processed and 'type' in processed:
                    if (not pd.isna(processed['floorCount']) and processed['floorCount'] > 4) or \
                       (not pd.isna(processed['type']) and processed['type'] == 'blockOfFlats'):
                        processed[col] = 'yes'
            
            # Convert yes/no to True/False
            processed[col] = True if processed[col] == 'yes' else False
    
    # 2. Fill missing values in numeric columns
    numeric_columns = {
        'floor': 2,              # Median value
        'floorCount': 5,         # Median value
        'squareMeters': 50,      # Typical apartment size
        'rooms': 2,              # Typical number of rooms
        'centreDistance': 5.0,   # Typical distance from center
        'poiCount': 10           # Average number of POIs
    }
    
    for col, default_value in numeric_columns.items():
        if col in processed and pd.isna(processed[col]):
            processed[col] = default_value
    
    # 3. Convert categorical features to numerical
    # Building type
    if 'type' in processed:
        type_mapping = {
            'blockOfFlats': 0,
            'tenement': 1,
            'apartmentBuilding': 2
        }
        if not pd.isna(processed['type']):
            processed['type_numeric'] = type_mapping.get(processed['type'], 3)
        else:
            processed['type_numeric'] = 0
        
        processed = processed.drop('type')
    
    # Apartment condition
    if 'condition' in processed:
        condition_mapping = {
            'very good': 4,
            'good': 3,
            'average': 2,
            'poor': 1,
            'to renovation': 0
        }
        if not pd.isna(processed['condition']):
            processed['condition_numeric'] = condition_mapping.get(processed['condition'], 2)
        else:
            processed['condition_numeric'] = 2
        
        processed = processed.drop('condition')
    
    # City (if present)
    if 'city' in processed:
        city_mapping = {
            'warszawa': 0,
            'krakow': 1,
            'wroclaw': 2,
            'gdansk': 3,
            'lodz': 4,
            'poznan': 5
        }
        if not pd.isna(processed['city']):
            processed['city_numeric'] = city_mapping.get(processed['city'].lower(), 6)
        else:
            processed['city_numeric'] = 0
        
        processed = processed.drop('city')
    
    # 4. Create new features
    # Floor ratio to total floors
    if 'floor' in processed and 'floorCount' in processed and processed['floorCount'] > 0:
        processed['floor_ratio'] = round(processed['floor'] / processed['floorCount'], 3)
    else:
        processed['floor_ratio'] = 0.5
    
    # Price per square meter
    if 'price' in processed and 'squareMeters' in processed and processed['squareMeters'] > 0:
        processed['price_per_m2'] = round(processed['price'] / processed['squareMeters'], 2)
    
    # Combined comfort score
    comfort_features = ['hasParkingSpace', 'hasBalcony', 'hasElevator', 'hasSecurity', 'hasStorageRoom']
    comfort_score = 0
    for feature in comfort_features:
        if feature in processed and processed[feature]:
            comfort_score += 1
    processed['comfort_score'] = comfort_score
    
    # 5. Remove rarely used or uninformative columns
    columns_to_drop = [
        'buildYear', 'buildingMaterial', 'ownership', 
        'schoolDistance', 'clinicDistance', 'kindergartenDistance', 
        'restaurantDistance', 'collegeDistance', 'pharmacyDistance', 'postOfficeDistance',
        'id'
    ]
    
    for col in columns_to_drop:
        if col in processed:
            processed = processed.drop(col)
    
    # 6. Normalize numerical features (if required)
    if normalize:
        # Define numerical columns (excluding boolean and price target variable)
        numeric_cols = [col for col in processed.index 
                        if isinstance(processed[col], (int, float)) 
                        and col != 'price'
                        and not (isinstance(processed[col], bool) or (processed[col] in [0, 1] and col in binary_columns))]
        
        # Normalization using predefined ranges
        normalization_ranges = {
            'squareMeters': (20, 200),
            'rooms': (1, 6),
            'floor': (0, 20),
            'floorCount': (1, 30),
            'centreDistance': (0, 20),
            'poiCount': (0, 50),
            'type_numeric': (0, 3),
            'condition_numeric': (0, 4),
            'city_numeric': (0, 6),
            'floor_ratio': (0, 1),
            'price_per_m2': (20, 500),
            'comfort_score': (0, 5)
        }
        
        for col in numeric_cols:
            if col in normalization_ranges:
                min_val, max_val = normalization_ranges[col]
                # Limit the value to the range and normalize
                val = max(min(processed[col], max_val), min_val)
                processed[col] = (val - min_val) / (max_val - min_val)
    
    return processed

## Test 1: Processing Sample Dataset

Let's create a sample dataset and apply processing function to test its functionality.

In [None]:
# Create a sample dataset with representative data
sample_data = {
    'id': [1001, 1002, 1003, 1004, 1005],
    'price': [350000, 420000, 280000, 500000, 330000],
    'squareMeters': [45, 60, 38, 75, 50],
    'rooms': [2, 3, 1, 4, 2],
    'floor': [3, 2, 1, 5, 4],
    'floorCount': [5, 4, 4, 8, 6],
    'hasParkingSpace': ['yes', 'yes', 'no', 'yes', None],
    'hasBalcony': ['no', 'yes', 'no', 'yes', 'yes'],
    'hasElevator': ['yes', 'no', 'no', 'yes', None],
    'hasSecurity': ['no', 'no', 'no', 'yes', 'yes'],
    'hasStorageRoom': ['yes', 'no', 'no', 'yes', 'no'],
    'centreDistance': [4.5, 6.0, 3.0, 8.5, 5.2],
    'poiCount': [12, 8, 15, 7, None],
    'type': ['blockOfFlats', 'tenement', 'blockOfFlats', 'apartmentBuilding', None],
    'condition': ['good', 'very good', 'to renovation', 'good', 'average'],
    'city': ['warszawa', 'krakow', 'warszawa', 'gdansk', 'poznan'],
    'buildYear': [2005, 1970, 1990, 2015, None],
    'buildingMaterial': ['brick', 'concrete', 'concrete', 'brick', None],
    'ownership': ['full ownership', 'cooperative', 'full ownership', 'full ownership', 'cooperative'],
    'schoolDistance': [0.8, 1.2, 0.5, 2.0, 1.5],
    'clinicDistance': [1.5, 0.7, 1.0, 2.5, 1.8],
    'kindergartenDistance': [0.6, 1.0, 0.4, 1.8, 1.2],
    'restaurantDistance': [0.3, 0.5, 0.2, 1.0, 0.7],
    'collegeDistance': [3.5, 2.0, 4.0, 5.5, 3.0],
    'pharmacyDistance': [0.4, 0.8, 0.3, 1.2, 0.9],
    'postOfficeDistance': [0.7, 1.5, 0.6, 2.0, 1.2]
}

# Create a DataFrame from the sample data
sample_df = pd.DataFrame(sample_data)

# Show the sample data
print("Sample Dataset Created:")
print(f"Shape: {sample_df.shape}")
sample_df.head()

In [None]:
# Apply the process_row_final function to the sample dataset
processed_sample_df = sample_df.apply(process_row_final, axis=1)

# Display the processed dataset
print("Processed Sample Dataset:")
processed_sample_df.head()

## Test 2: Individual Row Processing

Let's process a single row to verify that the function works correctly on individual rows (as it would in the streaming pipeline).

In [None]:
# Select and process a single row to verify individual row processing
test_row = sample_df.iloc[2]  # Take the third row
print("Original row:")
print(test_row)

# Process the row with normalization enabled
processed_row = process_row_final(test_row, normalize=True)
print("\nProcessed row with normalization:")
print(processed_row)

# Process the row without normalization to see raw transformations
processed_row_no_norm = process_row_final(test_row, normalize=False)
print("\nProcessed row without normalization:")
print(processed_row_no_norm)

## Test 3: Testing with Incomplete Data

Testing the robustness of our function by providing incomplete data - a common scenario in real-world applications.

In [None]:
# Test the function with incomplete data to verify robustness
incomplete_data = {
    'price': [400000],
    'squareMeters': [55],
    'rooms': [2],
    # Missing several fields to test default value handling
    'type': ['blockOfFlats']
}

incomplete_df = pd.DataFrame(incomplete_data)
print("Incomplete row:")
print(incomplete_df.iloc[0])

# Process the incomplete row
processed_incomplete = process_row_final(incomplete_df.iloc[0])
print("\nProcessed incomplete row:")
print(processed_incomplete)

# Check which fields were filled with default values
print("\nFields filled with default values:")
default_filled = [col for col in processed_incomplete.index if col not in incomplete_df.columns]
for col in default_filled:
    print(f"{col}: {processed_incomplete[col]}")

Visualize the effect of the processing by looking at the distribution of the generated features.

In [None]:
# Visualize the effect of processing on a specific feature
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
sns.countplot(y=processed_sample_df['comfort_score'])
plt.title('Comfort Score Distribution')
plt.xlabel('Count')
plt.ylabel('Score')

plt.subplot(1, 2, 2)
sns.scatterplot(x='comfort_score', y='price', data=processed_sample_df)
plt.title('Comfort Score vs Price')

plt.tight_layout()
plt.show()

## Integration with Streaming ETL Pipeline

This section shows how to integrate the `process_row_final` function into a streaming ETL pipeline with RabbitMQ.

In [None]:
import json
import pandas as pd
import pika
from typing import Dict, Any

# Import the process_row_final function from this module
# from processing_module import process_row_final

# RabbitMQ connection setup
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()

# Define queues
channel.queue_declare(queue='raw_data')
channel.queue_declare(queue='processed_data')

def callback(ch, method, properties, body):
    """Process incoming messages from the raw_data queue"""
    try:
        # Parse the message body as JSON
        message = json.loads(body)
        
        # Convert the JSON object to a pandas Series
        row = pd.Series(message)
        
        # Process the row using our function
        processed_row = process_row_final(row, normalize=True)
        
        # Convert the processed Series back to a dictionary
        processed_dict = processed_row.to_dict()
        
        # Send the processed data to the next component
        channel.basic_publish(
            exchange='',
            routing_key='processed_data',
            body=json.dumps(processed_dict)
        )
        
        # Acknowledge the message
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Processed message: {message.get('id', 'unknown')}")
    
    except Exception as e:
        print(f"Error processing message: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag)

# Set up consumer
channel.basic_consume(queue='raw_data', on_message_callback=callback)

print('Processor started. Waiting for messages...')
channel.start_consuming()

## Test 4: JSON Message Processing Simulation

Simulate processing of JSON messages as they would appear in the RabbitMQ message queue.

In [None]:
# Example of converting between JSON and pandas Series
# (which is what would happen in the streaming pipeline)

# Sample input JSON that might come from RabbitMQ
sample_json = {
    "id": 1006,
    "price": 450000,
    "squareMeters": 65,
    "rooms": 3,
    "floor": 4,
    "floorCount": 7,
    "hasParkingSpace": "yes",
    "hasBalcony": "yes",
    "hasElevator": "yes",
    "hasSecurity": "no",
    "hasStorageRoom": "yes",
    "centreDistance": 3.2,
    "poiCount": 18,
    "type": "blockOfFlats",
    "condition": "good",
    "city": "warszawa"
}

# Convert JSON to pandas Series
input_series = pd.Series(sample_json)
print("Input Series (from JSON):")
print(input_series)

# Process the row
processed_series = process_row_final(input_series)
print("\nProcessed Series:")
print(processed_series)

# Convert back to JSON (dict) for sending to the next component
output_json = processed_series.to_dict()
print("\nOutput JSON (to be sent to next component):")
import json
print(json.dumps(output_json, indent=2))

## Conclusion

This testing notebook confirms that the `process_row_final` function is working as expected and is ready to be integrated into the streaming ETL pipeline. The function:

1. Properly transforms binary fields from 'yes'/'no' to boolean values
2. Fills missing values with reasonable defaults
3. Converts categorical features to numerical values
4. Creates new features (floor_ratio, price_per_m2, comfort_score)
5. Removes unnecessary columns
6. Normalizes numerical features when requested
7. Handles incomplete data robustly
8. Can be easily integrated into a streaming ETL pipeline

This implementation can be used in the Processor component of the streaming ETL pipeline to prepare apartment data for analysis and machine learning applications.