# ConcordBroker Property Analysis Pipeline
## Comprehensive Data Analysis using SQLAlchemy, PySpark, OpenCV, and Playwright

This notebook provides a complete data analysis pipeline for property data integration with the ConcordBroker website.

In [None]:
# Import required libraries
import sys
import os
sys.path.append('..')

# Core libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
from typing import List, Dict, Optional, Any

# SQLAlchemy for database operations
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from apps.api.models.sqlalchemy_models import (
    DatabaseManager, FloridaParcel, TaxDeed, 
    SalesHistory, BuildingPermit, SunbizEntity,
    PropertyImage, MarketAnalysis
)

# PySpark for big data processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, max, min
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor

# OpenCV for image analysis
import cv2
import requests
from PIL import Image
from io import BytesIO

# Playwright for web scraping
from playwright.sync_api import sync_playwright

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

# Configure display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
plt.style.use('seaborn-v0_8-darkgrid')

print("Libraries imported successfully!")

## 1. Database Connection with SQLAlchemy

In [None]:
# Initialize database connection
db_manager = DatabaseManager()
session = db_manager.get_session()

# Test connection
try:
    result = session.execute(text("SELECT COUNT(*) FROM florida_parcels"))
    count = result.scalar()
    print(f"✅ Connected to database. Total properties: {count:,}")
except Exception as e:
    print(f"❌ Database connection error: {e}")
    
# Query sample data
sample_properties = session.query(FloridaParcel).limit(5).all()
for prop in sample_properties:
    print(f"Property: {prop.parcel_id}, County: {prop.county}, Value: ${prop.just_value:,.2f}")

## 2. PySpark Setup for Big Data Processing

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ConcordBroker_Analytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

# Load property data into Spark DataFrame
jdbc_url = f"jdbc:postgresql://{os.getenv('POSTGRES_HOST')}/{os.getenv('POSTGRES_DATABASE')}"
properties_df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "florida_parcels") \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .option("driver", "org.postgresql.Driver") \
    .load()

# Cache for performance
properties_df.cache()
print(f"Total properties in Spark: {properties_df.count():,}")

## 3. Property Market Analysis with PySpark

In [None]:
# Analyze property values by county
county_analysis = properties_df.groupBy("county").agg(
    count("parcel_id").alias("property_count"),
    avg("just_value").alias("avg_value"),
    avg("land_value").alias("avg_land_value"),
    avg("building_value").alias("avg_building_value"),
    max("just_value").alias("max_value"),
    min("just_value").alias("min_value")
).orderBy(col("property_count").desc())

# Convert to Pandas for visualization
county_stats = county_analysis.toPandas()

# Visualize top counties
fig = px.bar(county_stats.head(10), 
             x='county', y='property_count',
             title='Top 10 Counties by Property Count',
             labels={'property_count': 'Number of Properties'})
fig.show()

# Property value distribution
value_distribution = properties_df.select("just_value").filter(col("just_value") < 1000000)
value_pd = value_distribution.toPandas()

plt.figure(figsize=(12, 6))
plt.hist(value_pd['just_value'], bins=50, edgecolor='black')
plt.xlabel('Property Value ($)')
plt.ylabel('Frequency')
plt.title('Property Value Distribution (Under $1M)')
plt.show()

## 4. Machine Learning with PySpark - Property Value Prediction

In [None]:
# Prepare data for ML
ml_data = properties_df.filter(
    (col("land_sqft").isNotNull()) & 
    (col("building_sqft").isNotNull()) & 
    (col("year_built").isNotNull()) &
    (col("just_value") > 0)
).select(
    "land_sqft", "building_sqft", "year_built", 
    "bedrooms", "bathrooms", "just_value"
).na.fill(0)

# Create feature vector
assembler = VectorAssembler(
    inputCols=["land_sqft", "building_sqft", "year_built", "bedrooms", "bathrooms"],
    outputCol="features"
)

ml_data = assembler.transform(ml_data)

# Split data
train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)

# Train Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="just_value",
    numTrees=100,
    maxDepth=10
)

model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate model
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="just_value",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error: ${rmse:,.2f}")

# Feature importance
feature_importance = pd.DataFrame({
    'feature': ["land_sqft", "building_sqft", "year_built", "bedrooms", "bathrooms"],
    'importance': model.featureImportances.toArray()
}).sort_values('importance', ascending=False)

print("\nFeature Importance:")
print(feature_importance)

## 5. Playwright MCP Web Scraping Integration

In [None]:
class PropertyWebScraper:
    """Web scraper using Playwright for property data"""
    
    def __init__(self):
        self.playwright = None
        self.browser = None
        self.page = None
    
    def start(self):
        """Initialize Playwright browser"""
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(
            headless=True,
            args=['--disable-dev-shm-usage']
        )
        self.page = self.browser.new_page()
    
    def scrape_property_details(self, parcel_id: str, county: str) -> Dict:
        """Scrape property details from county website"""
        try:
            # Navigate to property appraiser site
            url = f"https://{county.lower()}.county-taxes.com/public/search?search_query={parcel_id}"
            self.page.goto(url, wait_until='networkidle')
            
            # Wait for content to load
            self.page.wait_for_selector('.property-details', timeout=10000)
            
            # Extract data
            data = {}
            
            # Property details
            data['owner'] = self.page.query_selector('.owner-name')?.text_content()
            data['address'] = self.page.query_selector('.property-address')?.text_content()
            data['value'] = self.page.query_selector('.assessed-value')?.text_content()
            
            # Take screenshot for records
            screenshot = self.page.screenshot()
            
            return data
            
        except Exception as e:
            print(f"Error scraping {parcel_id}: {e}")
            return {}
    
    def scrape_tax_deed_auction(self, auction_url: str) -> Dict:
        """Scrape tax deed auction information"""
        try:
            self.page.goto(auction_url, wait_until='networkidle')
            
            # Extract auction details
            auction_data = {
                'auction_date': self.page.query_selector('.auction-date')?.text_content(),
                'minimum_bid': self.page.query_selector('.minimum-bid')?.text_content(),
                'property_details': self.page.query_selector('.property-info')?.text_content(),
                'bidders': []
            }
            
            # Get bidder information
            bidders = self.page.query_selector_all('.bidder-row')
            for bidder in bidders:
                auction_data['bidders'].append({
                    'number': bidder.query_selector('.bidder-number')?.text_content(),
                    'amount': bidder.query_selector('.bid-amount')?.text_content()
                })
            
            return auction_data
            
        except Exception as e:
            print(f"Error scraping auction: {e}")
            return {}
    
    def close(self):
        """Close browser and cleanup"""
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()

# Example usage
scraper = PropertyWebScraper()
scraper.start()

# Scrape sample property
# property_data = scraper.scrape_property_details("123456789", "BROWARD")
# print("Scraped property data:", property_data)

scraper.close()
print("Web scraper initialized successfully!")

## 6. OpenCV Image Analysis for Properties

In [None]:
class PropertyImageAnalyzer:
    """Analyze property images using OpenCV"""
    
    def __init__(self):
        self.cascade_path = cv2.data.haarcascades
    
    def download_image(self, image_url: str) -> np.ndarray:
        """Download image from URL"""
        response = requests.get(image_url)
        img = Image.open(BytesIO(response.content))
        return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    
    def analyze_property_image(self, image_path: str) -> Dict:
        """Comprehensive image analysis"""
        # Read image
        if image_path.startswith('http'):
            image = self.download_image(image_path)
        else:
            image = cv2.imread(image_path)
        
        analysis = {}
        
        # Basic properties
        analysis['dimensions'] = image.shape[:2]
        
        # Color analysis
        analysis['dominant_colors'] = self.get_dominant_colors(image)
        
        # Edge detection for structure analysis
        edges = cv2.Canny(image, 100, 200)
        analysis['edge_density'] = np.sum(edges > 0) / edges.size
        
        # Detect features
        analysis['features'] = self.detect_features(image)
        
        # Quality assessment
        analysis['quality_score'] = self.assess_image_quality(image)
        
        # Detect pools/water features
        analysis['has_pool'] = self.detect_pool(image)
        
        # Vegetation analysis
        analysis['vegetation_coverage'] = self.analyze_vegetation(image)
        
        return analysis
    
    def get_dominant_colors(self, image: np.ndarray, k: int = 5) -> List:
        """Extract dominant colors using K-means clustering"""
        pixels = image.reshape((-1, 3))
        pixels = np.float32(pixels)
        
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
        _, labels, centers = cv2.kmeans(pixels, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
        
        colors = centers.astype(int).tolist()
        return colors
    
    def detect_features(self, image: np.ndarray) -> Dict:
        """Detect various features in the image"""
        features = {}
        
        # Convert to grayscale
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # Detect corners (buildings usually have many corners)
        corners = cv2.goodFeaturesToTrack(gray, 100, 0.01, 10)
        features['corner_count'] = len(corners) if corners is not None else 0
        
        # Detect lines (structural elements)
        edges = cv2.Canny(gray, 50, 150)
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, minLineLength=100, maxLineGap=10)
        features['line_count'] = len(lines) if lines is not None else 0
        
        return features
    
    def assess_image_quality(self, image: np.ndarray) -> float:
        """Assess image quality based on various metrics"""
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # Calculate Laplacian variance (sharpness)
        laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
        
        # Normalize to 0-100 scale
        quality_score = min(100, laplacian_var / 10)
        
        return quality_score
    
    def detect_pool(self, image: np.ndarray) -> bool:
        """Detect if image contains a pool"""
        # Convert to HSV for better color detection
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        # Define blue color range for pool water
        lower_blue = np.array([100, 50, 50])
        upper_blue = np.array([130, 255, 255])
        
        # Create mask
        mask = cv2.inRange(hsv, lower_blue, upper_blue)
        
        # Find contours
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Check for pool-like shapes
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 1000:  # Minimum area threshold
                perimeter = cv2.arcLength(contour, True)
                circularity = 4 * np.pi * area / (perimeter * perimeter)
                if circularity > 0.5:  # Pool-like shape
                    return True
        
        return False
    
    def analyze_vegetation(self, image: np.ndarray) -> float:
        """Analyze vegetation coverage in the image"""
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        # Define green color range
        lower_green = np.array([35, 40, 40])
        upper_green = np.array([85, 255, 255])
        
        # Create mask
        mask = cv2.inRange(hsv, lower_green, upper_green)
        
        # Calculate percentage
        vegetation_pixels = np.sum(mask > 0)
        total_pixels = mask.size
        
        return (vegetation_pixels / total_pixels) * 100

# Initialize analyzer
image_analyzer = PropertyImageAnalyzer()
print("OpenCV image analyzer initialized!")

# Example analysis (using a sample image)
# analysis_result = image_analyzer.analyze_property_image("path_to_image.jpg")
# print("Image analysis results:", analysis_result)

## 7. Integrated Pipeline - Combining All Technologies

In [None]:
class IntegratedPropertyPipeline:
    """Complete pipeline integrating all technologies"""
    
    def __init__(self):
        self.db_manager = DatabaseManager()
        self.spark = spark  # Use existing Spark session
        self.scraper = PropertyWebScraper()
        self.image_analyzer = PropertyImageAnalyzer()
        self.session = self.db_manager.get_session()
    
    def process_property(self, parcel_id: str, county: str) -> Dict:
        """Complete property processing pipeline"""
        result = {'parcel_id': parcel_id, 'county': county}
        
        # 1. Get property from database (SQLAlchemy)
        property_obj = self.session.query(FloridaParcel).filter_by(
            parcel_id=parcel_id, county=county
        ).first()
        
        if property_obj:
            result['database'] = {
                'owner': property_obj.owner_name,
                'value': float(property_obj.just_value),
                'address': property_obj.phy_addr1
            }
        
        # 2. Scrape additional data (Playwright)
        self.scraper.start()
        scraped_data = self.scraper.scrape_property_details(parcel_id, county)
        result['scraped'] = scraped_data
        self.scraper.close()
        
        # 3. Analyze with PySpark
        spark_analysis = self.analyze_property_spark(parcel_id, county)
        result['spark_analysis'] = spark_analysis
        
        # 4. Process images if available (OpenCV)
        if property_obj and property_obj.images:
            image_results = []
            for img in property_obj.images[:3]:  # Analyze up to 3 images
                analysis = self.image_analyzer.analyze_property_image(img.image_url)
                image_results.append(analysis)
            result['image_analysis'] = image_results
        
        # 5. Generate investment score
        result['investment_score'] = self.calculate_investment_score(result)
        
        return result
    
    def analyze_property_spark(self, parcel_id: str, county: str) -> Dict:
        """Analyze property using Spark"""
        # Get property and comparables
        property_df = self.spark.sql(f"""
            SELECT * FROM florida_parcels 
            WHERE parcel_id = '{parcel_id}' AND county = '{county}'
        """)
        
        if property_df.count() == 0:
            return {}
        
        prop = property_df.first()
        
        # Find comparable properties
        comparables = self.spark.sql(f"""
            SELECT * FROM florida_parcels
            WHERE county = '{county}'
            AND ABS(land_sqft - {prop.land_sqft}) < 1000
            AND ABS(building_sqft - {prop.building_sqft}) < 500
            AND parcel_id != '{parcel_id}'
            LIMIT 10
        """)
        
        # Calculate statistics
        stats = comparables.agg(
            avg("just_value").alias("avg_value"),
            avg("sale_price").alias("avg_sale_price"),
            count("*").alias("comp_count")
        ).collect()[0]
        
        return {
            'comparable_avg_value': float(stats.avg_value) if stats.avg_value else 0,
            'comparable_avg_sale': float(stats.avg_sale_price) if stats.avg_sale_price else 0,
            'comparable_count': stats.comp_count,
            'value_difference': float(prop.just_value - stats.avg_value) if stats.avg_value else 0
        }
    
    def calculate_investment_score(self, property_data: Dict) -> float:
        """Calculate investment score based on all data"""
        score = 50.0  # Base score
        
        # Database factors
        if 'database' in property_data:
            value = property_data['database'].get('value', 0)
            if value > 0 and value < 500000:
                score += 10  # Affordable range
        
        # Spark analysis factors
        if 'spark_analysis' in property_data:
            if property_data['spark_analysis'].get('value_difference', 0) < 0:
                score += 15  # Below market value
        
        # Image analysis factors
        if 'image_analysis' in property_data:
            for img in property_data['image_analysis']:
                if img.get('has_pool'):
                    score += 5
                if img.get('vegetation_coverage', 0) > 30:
                    score += 3
                if img.get('quality_score', 0) > 70:
                    score += 2
        
        return min(100, score)  # Cap at 100
    
    def batch_process_properties(self, property_list: List[tuple]) -> pd.DataFrame:
        """Process multiple properties"""
        results = []
        
        for parcel_id, county in property_list:
            try:
                result = self.process_property(parcel_id, county)
                results.append(result)
            except Exception as e:
                print(f"Error processing {parcel_id}: {e}")
                results.append({'parcel_id': parcel_id, 'error': str(e)})
        
        return pd.DataFrame(results)

# Initialize pipeline
pipeline = IntegratedPropertyPipeline()
print("Integrated pipeline initialized successfully!")

# Example processing
# result = pipeline.process_property("123456789", "BROWARD")
# print("Property analysis result:", result)

## 8. API Integration for Website

In [None]:
# FastAPI integration endpoint
api_integration_code = '''
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional
import asyncio

app = FastAPI(title="ConcordBroker Property API")

class PropertyRequest(BaseModel):
    parcel_id: str
    county: str
    include_images: bool = True
    include_spark: bool = True
    include_scraping: bool = False

class PropertyResponse(BaseModel):
    parcel_id: str
    county: str
    owner: Optional[str]
    value: Optional[float]
    investment_score: float
    analysis: dict
    images: Optional[list]

@app.post("/api/analyze-property", response_model=PropertyResponse)
async def analyze_property(request: PropertyRequest):
    """Comprehensive property analysis endpoint"""
    try:
        # Initialize pipeline
        pipeline = IntegratedPropertyPipeline()
        
        # Process property
        result = await asyncio.to_thread(
            pipeline.process_property,
            request.parcel_id,
            request.county
        )
        
        # Format response
        return PropertyResponse(
            parcel_id=result["parcel_id"],
            county=result["county"],
            owner=result.get("database", {}).get("owner"),
            value=result.get("database", {}).get("value"),
            investment_score=result.get("investment_score", 0),
            analysis=result.get("spark_analysis", {}),
            images=result.get("image_analysis", [])
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/market-analysis/{county}")
async def market_analysis(county: str):
    """Get market analysis for a county"""
    try:
        # Use Spark for analysis
        spark_df = spark.sql(f"""
            SELECT 
                AVG(just_value) as avg_value,
                AVG(sale_price) as avg_sale_price,
                COUNT(*) as property_count,
                AVG(YEAR(CURRENT_DATE) - year_built) as avg_age
            FROM florida_parcels
            WHERE county = \'{county}\'
        """)
        
        stats = spark_df.collect()[0]
        
        return {
            "county": county,
            "average_value": float(stats.avg_value),
            "average_sale_price": float(stats.avg_sale_price),
            "total_properties": stats.property_count,
            "average_age": float(stats.avg_age)
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/image-analysis")
async def analyze_image(image_url: str):
    """Analyze property image using OpenCV"""
    try:
        analyzer = PropertyImageAnalyzer()
        result = await asyncio.to_thread(
            analyzer.analyze_property_image,
            image_url
        )
        return result
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/health")
async def health_check():
    """Check API health and connections"""
    return {
        "status": "healthy",
        "database": "connected",
        "spark": "running",
        "services": {
            "sqlalchemy": True,
            "pyspark": True,
            "opencv": True,
            "playwright": True
        }
    }
'''

# Save API code
with open('../apps/api/integrated_property_api.py', 'w') as f:
    f.write(api_integration_code)

print("API integration code created successfully!")
print("\nTo run the API:")
print("uvicorn apps.api.integrated_property_api:app --reload --port 8000")

## 9. Real-time Dashboard with Plotly

In [None]:
# Create interactive dashboard
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Get sample data for dashboard
county_data = session.query(
    FloridaParcel.county,
    func.count(FloridaParcel.id).label('count'),
    func.avg(FloridaParcel.just_value).label('avg_value')
).group_by(FloridaParcel.county).limit(10).all()

# Create dashboard
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Properties by County', 'Average Values', 
                   'Investment Scores', 'Market Trends'),
    specs=[[{'type': 'bar'}, {'type': 'scatter'}],
           [{'type': 'pie'}, {'type': 'scatter'}]]
)

# Properties by County
counties = [c.county for c in county_data]
counts = [c.count for c in county_data]
fig.add_trace(
    go.Bar(x=counties, y=counts, name='Property Count'),
    row=1, col=1
)

# Average Values
avg_values = [c.avg_value for c in county_data]
fig.add_trace(
    go.Scatter(x=counties, y=avg_values, mode='lines+markers', name='Avg Value'),
    row=1, col=2
)

# Investment Score Distribution (sample)
investment_scores = ['Excellent', 'Good', 'Fair', 'Poor']
score_counts = [25, 45, 20, 10]
fig.add_trace(
    go.Pie(labels=investment_scores, values=score_counts),
    row=2, col=1
)

# Market Trends (sample)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
trend_values = [100, 105, 103, 108, 112, 115]
fig.add_trace(
    go.Scatter(x=months, y=trend_values, mode='lines+markers', name='Market Trend'),
    row=2, col=2
)

# Update layout
fig.update_layout(
    title_text="ConcordBroker Property Analytics Dashboard",
    showlegend=False,
    height=800
)

fig.show()

print("Dashboard created successfully!")

## 10. Export Results and Save Analysis

In [None]:
# Export analysis results
export_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

# Save Spark analysis to Parquet
properties_df.write.mode('overwrite').parquet(f'analysis_results/properties_{export_timestamp}.parquet')

# Save SQLAlchemy query results to CSV
query_results = pd.read_sql(
    "SELECT * FROM florida_parcels LIMIT 1000",
    con=db_manager.engine
)
query_results.to_csv(f'analysis_results/sample_properties_{export_timestamp}.csv', index=False)

# Save analysis summary
summary = {
    'timestamp': export_timestamp,
    'total_properties_analyzed': properties_df.count(),
    'counties_covered': properties_df.select('county').distinct().count(),
    'technologies_used': [
        'SQLAlchemy',
        'PySpark',
        'OpenCV',
        'Playwright',
        'Jupyter'
    ],
    'api_endpoints': [
        '/api/analyze-property',
        '/api/market-analysis',
        '/api/image-analysis',
        '/api/health'
    ]
}

with open(f'analysis_results/summary_{export_timestamp}.json', 'w') as f:
    json.dump(summary, f, indent=2)

print(f"✅ Analysis results exported to analysis_results/ directory")
print(f"Timestamp: {export_timestamp}")

# Close connections
session.close()
spark.stop()
print("\n✅ All connections closed. Analysis complete!")