# ETL Pipeline: Web Scraping Real Estate Data (BuyRentKenya)

This notebook demonstrates a complete Extract, Transform, Load (ETL) pipeline using Python:

1.  **Extract (Scraping):** Fetches property listings from a real estate website using `requests` and `BeautifulSoup`.
2.  **Transform (Cleaning):** Cleans and converts raw text data (like price, bedrooms, and size) into numeric formats using `pandas` and `re`.
3.  **Load (Database):** Inserts the cleaned data into a PostgreSQL database using `SQLAlchemy`.

In [None]:
# Install necessary libraries
%pip install beautifulsoup4 pandas requests sqlalchemy psycopg2-binary --quiet

In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
from typing import List, Dict
import time
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect
from datetime import datetime

def scrape_pages(start_page: int, end_page: int) -> pd.DataFrame:
    """Scrapes property listings from a range of BuyRentKenya pages."""
    base_url = 'https://www.buyrentkenya.com/houses-for-sale'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                      '(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }
    properties = []
    for page_num in range(start_page, end_page + 1):
        url = f'{base_url}?page={page_num}'
        print(f"\nüîç Scraping page {page_num}: {url}")
        try:
            response = requests.get(url, headers=headers, timeout=10)
            if response.status_code != 200:
                print(f"‚ö†Ô∏è  Failed to retrieve page {page_num}. "
                      f"Status code: {response.status_code}")
                continue
            
            soup = BeautifulSoup(response.content, 'html.parser')
            listings = soup.find_all('div', class_='listing-card')
            print(f"    ‚úì Found {len(listings)} listings on page {page_num}")
            
            for listing in listings:
                # Extracting Title, Price, Location
                title_tag = listing.find('h2')
                title = title_tag.get_text(strip=True) if title_tag else 'No title'
                price_tag = listing.find('p', class_='text-xl font-bold leading-7 text-grey-900')
                price = price_tag.get_text(strip=True) if price_tag else 'No price'
                location_tag = listing.find('p', class_='ml-1 truncate text-sm font-normal capitalize text-grey-650')
                location = location_tag.get_text(strip=True) if location_tag else 'No location'

                # Extracting Bedrooms, Bathrooms, Size
                bedrooms = bathrooms = size = 'N/A'
                swiper_div = listing.find('div', class_='scrollable-list')
                if swiper_div:
                    slides = swiper_div.find_all('div', class_='swiper-slide')
                    for slide in slides:
                        text = slide.get_text(strip=True)
                        if 'Bedroom' in text:
                            bedrooms = text
                        elif 'Bathroom' in text:
                            bathrooms = text
                        elif 'm¬≤' in text:
                            size = text

                properties.append({
                    'Title': title,
                    'Price': price,
                    'Location': location,
                    'Bedrooms': bedrooms,
                    'Bathrooms': bathrooms,
                    'Size': size
                })
            
            if page_num < end_page:
                time.sleep(1) # Be polite and wait 1 second between pages
                
        except requests.RequestException as e:
            print(f"‚ùå Error scraping page {page_num}: {str(e)}")
            continue
            
    df = pd.DataFrame(properties)
    print(f"\n‚úÖ Scraping complete! Total properties extracted: {len(df)}")
    return df

print("=" * 60)
print("STARTING WEB SCRAPING PROCESS")
print("=" * 60)
df_all_pages = scrape_pages(start_page=1, end_page=4)

In [None]:
print("\n" + "=" * 60)
print("PREVIEW OF SCRAPED DATA")
print("=" * 60)
display(df_all_pages.head(10))

print("\nüìä DATASET INFORMATION:")
print(f"Total rows: {len(df_all_pages)}")
print(f"Total columns: {len(df_all_pages.columns)}")
print(f"\nColumn names: {list(df_all_pages.columns)}")
print(f"\nData types:\n{df_all_pages.dtypes}")
print(f"\nMissing values:\n{df_all_pages.isnull().sum()}")

# 2. Transform: Data Cleaning and Preprocessing

In [None]:
import re
import numpy as np

def clean_price(price_str: str) -> float:
    """Converts price string (e.g., 'KSh 1,200,000') to a float."""
    if not price_str or price_str in ['No price', 'N/A', '']:
        return np.nan
    try:
        clean_str = price_str.replace('KSh', '').strip()
        clean_str = clean_str.replace(',', '')
        return float(clean_str)
    except (ValueError, AttributeError):
        return np.nan

def extract_number_from_text(text: str) -> float:
    """Extracts the first number from a string for bedrooms/bathrooms."""
    if not text or text in ['N/A', 'No data', '']:
        return np.nan
    if 'studio' in text.lower():
        return 0.0
    try:
        match = re.search(r'\d+', text)
        if match:
            return float(match.group())
        else:
            return np.nan
    except (ValueError, AttributeError):
        return np.nan

def clean_size(size_str: str) -> float:
    """Converts size string (e.g., '100 m¬≤') to a float in sq meters."""
    if not size_str or size_str in ['N/A', 'No size', '']:
        return np.nan
    try:
        clean_str = size_str.replace('m¬≤', '').replace('m2', '').strip()
        clean_str = clean_str.replace(',', '')
        return float(clean_str)
    except (ValueError, AttributeError):
        return np.nan

def clean_location(location_str: str) -> str:
    """Standardizes location strings (title case, handles missing data)."""
    if not location_str or location_str in ['No location', 'N/A', '']:
        return 'Unknown'
    return location_str.strip().title()

def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Applies all cleaning functions and filters the resulting DataFrame."""
    print("\nüßπ STARTING DATA CLEANING PROCESS...")
    print("=" * 60)
    df_clean = df.copy()
    initial_rows = len(df_clean)
    print(f"Initial number of records: {initial_rows}")

    print("\n1Ô∏è‚É£ Cleaning Price column...")
    df_clean['Price_Numeric'] = df_clean['Price'].apply(clean_price)
    valid_prices = df_clean['Price_Numeric'].notna().sum()
    print(f"    ‚úì Converted {valid_prices}/{initial_rows} prices to numeric")
    
    print("\n2Ô∏è‚É£ Cleaning Bedrooms column...")
    df_clean['Bedrooms_Numeric'] = df_clean['Bedrooms'].apply(extract_number_from_text)
    valid_bedrooms = df_clean['Bedrooms_Numeric'].notna().sum()
    print(f"    ‚úì Extracted {valid_bedrooms}/{initial_rows} bedroom counts")

    print("\n3Ô∏è‚É£ Cleaning Bathrooms column...")
    df_clean['Bathrooms_Numeric'] = df_clean['Bathrooms'].apply(extract_number_from_text)
    valid_bathrooms = df_clean['Bathrooms_Numeric'].notna().sum()
    print(f"    ‚úì Extracted {valid_bathrooms}/{initial_rows} bathroom counts")
    
    print("\n4Ô∏è‚É£ Cleaning Size column...")
    df_clean['Size_SqM'] = df_clean['Size'].apply(clean_size)
    valid_sizes = df_clean['Size_SqM'].notna().sum()
    print(f"    ‚úì Extracted {valid_sizes}/{initial_rows} size values")

    print("\n5Ô∏è‚É£ Cleaning Location column...")
    df_clean['Location_Clean'] = df_clean['Location'].apply(clean_location)
    print(f"    ‚úì Standardized all location names")

    print("\n6Ô∏è‚É£ Removing incomplete records...")
    df_clean = df_clean.dropna(subset=['Price_Numeric', 'Bedrooms_Numeric'])
    final_rows = len(df_clean)
    removed_rows = initial_rows - final_rows
    print(f"    ‚úì Removed {removed_rows} records with missing critical data")
    print(f"    ‚úì Final dataset: {final_rows} records")

    print("\n7Ô∏è‚É£ Adding metadata columns...")
    df_clean['Scraped_Date'] = pd.Timestamp.now()
    df_clean['Source'] = 'buyrentkenya.com'
    print(f"    ‚úì Added Scraped_Date and Source columns")

    print("\n" + "=" * 60)
    print("‚úÖ DATA CLEANING COMPLETE!")
    print("=" * 60)
    return df_clean

df_cleaned = clean_dataframe(df_all_pages)

In [None]:
print("\nüìä CLEANED DATA PREVIEW:")
print("\nFirst 5 rows:")
display(df_cleaned.head())

print("\nüîç COMPARISON: RAW vs CLEANED DATA")
print("=" * 60)
print("\nüìã RAW DATA (first row):")
print(df_all_pages.iloc[0].to_string())

print("\n\n‚ú® CLEANED DATA (first row):\n")
print(df_cleaned.iloc[0].to_string())

print("\n\nüìà DATA QUALITY SUMMARY:")
print(f"Raw data columns: {len(df_all_pages.columns)}")
print(f"Cleaned data columns: {len(df_cleaned.columns)}")
print(f"\nNew columns added: {set(df_cleaned.columns) - set(df_all_pages.columns)}")

# 3. Load: Database Integration (PostgreSQL via SQLAlchemy)

**Note:** This section requires a running PostgreSQL instance with the specified database name (`house_prices`) created, and the credentials must be correct for successful execution.

In [None]:
# --- Database Configuration ---
DB_USERNAME = 'postgres'
DB_PASSWORD = '7510'
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'house_prices'
DATABASE_URL = f"postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

print("üîó Database Connection String Built:")
print(f"    Connecting to: {DB_HOST}:{DB_PORT}")
print(f"    Database: {DB_NAME}")
print(f"    Username: {DB_USERNAME}")
print("    (Password hidden for security)")

print("\nüîß Creating Database Engine...")
try:
    engine = create_engine(DATABASE_URL, echo=False) # echo=True for verbose SQL logs
    with engine.connect() as connection:
        print("‚úÖ Successfully connected to PostgreSQL database!")
except Exception as e:
    print(f"‚ùå Failed to connect to database: {e}")
    print("\nüí° TROUBLESHOOTING TIPS:")
    print("    1. Is PostgreSQL running? Check with: `sudo service postgresql status`")
    print("    2. Does the database exist? Create it with: `createdb house_prices`")
    print("    3. Are username/password/host correct?")
    # Stop execution if connection fails, as subsequent cells will fail
    raise
    
# --- SQLAlchemy Model Definition ---
Base = declarative_base()
class HouseProperty(Base):
    __tablename__ = 'properties'
    
    # Primary Key and Raw Data Columns
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(Text, nullable=False)
    price_text = Column(String(50))
    location = Column(String(200))
    bedrooms_text = Column(String(50))
    bathrooms_text = Column(String(50))
    size_text = Column(String(50))
    
    # Cleaned Data Columns
    price_numeric = Column(Float)
    bedrooms_numeric = Column(Integer)
    bathrooms_numeric = Column(Integer)
    size_sqm = Column(Float)
    location_clean = Column(String(200))
    
    # Metadata
    source = Column(String(100))
    scraped_date = Column(DateTime)
    inserted_date = Column(DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f"<Property(id={self.id}, title='{self.title[:30]}...', price={self.price_numeric})>"
        
print("\nüìã Database Schema Defined:")
print(f"    Table Name: {HouseProperty.__tablename__}")
for column in HouseProperty.__table__.columns:
    print(f"      - {column.name}: {column.type}")
    
# --- Create Table ---
print("\nüèóÔ∏è  Creating table in database (if it does not exist)...")
try:
    Base.metadata.create_all(engine)
    print("‚úÖ Table 'properties' ensured to exist successfully!")
    
    # Verification check
    inspector = inspect(engine)
    if 'properties' in inspector.get_table_names():
        columns = inspector.get_columns('properties')
        print(f"    Verified: Table exists with {len(columns)} columns.")
except Exception as e:
    print(f"‚ùå Error creating table: {e}")

In [None]:
def prepare_data_for_db(df: pd.DataFrame) -> list:
    """Converts a pandas DataFrame into a list of SQLAlchemy HouseProperty objects."""
    print(f"\nüì¶ Preparing {len(df)} records for database insertion...")
    property_objects = []
    for index, row in df.iterrows():
        property_obj = HouseProperty(
            title=row['Title'],
            price_text=row['Price'],
            location=row['Location'],
            bedrooms_text=row['Bedrooms'],
            bathrooms_text=row['Bathrooms'],
            size_text=row['Size'],
            price_numeric=row['Price_Numeric'] if pd.notna(row['Price_Numeric']) else None,
            bedrooms_numeric=int(row['Bedrooms_Numeric']) if pd.notna(row['Bedrooms_Numeric']) else None,
            bathrooms_numeric=int(row['Bathrooms_Numeric']) if pd.notna(row['Bathrooms_Numeric']) else None,
            size_sqm=row['Size_SqM'] if pd.notna(row['Size_SqM']) else None,
            location_clean=row['Location_Clean'],
            source=row['Source'],
            scraped_date=row['Scraped_Date']
        )
        property_objects.append(property_obj)
    print(f"‚úÖ Prepared {len(property_objects)} property objects")
    return property_objects

property_records = prepare_data_for_db(df_cleaned)
print("\nüìã Example Property Object:")
print(property_records[0])

def insert_data_to_db(property_objects: list, engine):
    """Inserts property objects into the database using SQLAlchemy sessions."""
    Session = sessionmaker(bind=engine)
    session = Session()
    print(f"\nüíæ Inserting {len(property_objects)} records into database...")
    try:
        session.add_all(property_objects)
        session.commit()
        print(f"‚úÖ Successfully inserted {len(property_objects)} records!")
        return len(property_objects)
    except Exception as e:
        session.rollback()
        print(f"‚ùå Error inserting data: {e}")
        return 0
    finally:
        session.close()
        
records_inserted = insert_data_to_db(property_records, engine)
print(f"\nüìä DATABASE INSERTION SUMMARY:")
print(f"    Total records processed: {len(df_cleaned)}")
print(f"    Successfully inserted: {records_inserted}")

In [None]:
print("\nüîç VERIFYING DATA IN DATABASE...")
print("=" * 60)

# Query 1: Top 5 Records
query = """
    SELECT 
        id, 
        title, 
        price_numeric, 
        bedrooms_numeric, 
        bathrooms_numeric, 
        location_clean, 
        scraped_date
    FROM properties
    ORDER BY id DESC -- Show the most recently inserted data first
    LIMIT 5
"""
verification_data = pd.read_sql(query, engine)
print("Top 5 Most Recently Inserted Records:")
display(verification_data)

print("\nüìä DATABASE STATISTICS")
print("=" * 60)

# Query 2: Average Price
avg_price_query = """
    SELECT AVG(price_numeric) as avg_price
    FROM properties
    WHERE price_numeric IS NOT NULL
"""
avg_price = pd.read_sql(avg_price_query, engine)
print(f"    Average Price (All Data): KSh {avg_price['avg_price'].iloc[0]:,.2f}")

# Query 3: Price Range
price_range_query = """
    SELECT 
        MIN(price_numeric) as min_price, 
        MAX(price_numeric) as max_price
    FROM properties
    WHERE price_numeric IS NOT NULL
"""
price_range = pd.read_sql(price_range_query, engine)
print(f"    Cheapest: KSh {price_range['min_price'].iloc[0]:,.2f}")
print(f"    Most Expensive: KSh {price_range['max_price'].iloc[0]:,.2f}")

# Query 4: Bedroom Distribution
bedroom_dist_query = """
    SELECT 
        bedrooms_numeric,
        COUNT(*) as count
    FROM properties
    WHERE bedrooms_numeric IS NOT NULL
    GROUP BY bedrooms_numeric
    ORDER BY bedrooms_numeric
"""
bedroom_dist = pd.read_sql(bedroom_dist_query, engine)
print(f"\nüõèÔ∏è  Bedroom Distribution (All Data):")
display(bedroom_dist)

# Query 5: Top 5 Locations
location_dist_query = """
    SELECT 
        location_clean,
        COUNT(*) as count
    FROM properties
    GROUP BY location_clean
    ORDER BY count DESC
    LIMIT 5
"""
location_dist = pd.read_sql(location_dist_query, engine)
print(f"\nüìç Top 5 Locations (All Data):")
display(location_dist)

print("\n" + "=" * 60)
print("‚úÖ STEP 3: DATABASE STORAGE COMPLETE!")
print("=" * 60)