# 🏡 Buyer Folio Inc. – Real Estate Agent Matching Platform
**End-to-End Data Engineering Pipeline using Python, AWS, and ML**

In [None]:
import pandas as pd
import numpy as np
import requests
import json
import boto3
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

## 🔁 Data Extraction

In [None]:
# Simulating API response
api_response = [
    {"agent_id": 1, "name": "Alice", "specialty": "Luxury Homes", "rating": 4.8, "area": "NY", "transactions": 120},
    {"agent_id": 2, "name": "Bob", "specialty": "First-time Buyers", "rating": 4.5, "area": "CA", "transactions": 80}
]

# Convert to DataFrame
data = pd.DataFrame(api_response)
data.to_csv("data/raw/agent_profiles.csv", index=False)
data.head()

## 🔄 Data Transformation

In [None]:
# Load and transform data
df = pd.read_csv("data/raw/agent_profiles.csv")
df['performance_score'] = df['rating'] * np.log1p(df['transactions'])
df = pd.get_dummies(df, columns=['specialty', 'area'])
df.to_csv("data/processed/agent_profiles_clean.csv", index=False)
df.head()

## 🤖 Machine Learning Model

In [None]:
# Dummy target variable and model training
df['target'] = [1, 0]  # 1 = top match, 0 = not top match
X = df.drop(columns=['name', 'agent_id', 'target'])
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=42)
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)

preds = model.predict(X_test)
print(classification_report(y_test, preds))

## ☁️ Load Processed Data to AWS S3 (Simulated)

In [None]:
# Simulated upload to S3
# Normally you'd configure AWS credentials here using boto3

# s3 = boto3.client('s3')
# s3.upload_file('data/processed/agent_profiles_clean.csv', 'my-bucket-name', 'clean_data/agent_profiles_clean.csv')
print("✅ Data prepared for S3 upload.")

## ✅ Summary
We built an end-to-end pipeline that:
- Extracted data from simulated API
- Cleaned and transformed agent profiles
- Built a basic ML model to simulate matchmaking logic
- Simulated S3 integration for cloud-based storage

## 📊 Power BI / Looker Dashboard Sample
To visualize the processed data in Power BI or Looker:

1. Upload the `agent_profiles_clean.csv` to a cloud storage like AWS S3 or Google Cloud Storage.
2. Connect Power BI / Looker to the cloud storage or use a cloud warehouse like BigQuery or Redshift.
3. Build dashboards with KPIs like:
   - Top Rated Agents
   - Match Success Rates
   - Agent Coverage by Region
   - Satisfaction Scores by Specialty

**Example Power BI Queries:**
```sql
SELECT specialty, AVG(rating) as avg_rating
FROM agent_profiles
GROUP BY specialty;

SELECT area, COUNT(*) as match_count
FROM matches
GROUP BY area;
```

## 🛫 Airflow DAG Simulation

In [None]:
# Simulated Airflow DAG (Python representation)
def extract():
    print("Extracting data from API...")

def transform():
    print("Transforming data...")

def load():
    print("Uploading data to S3...")

# DAG definition
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1
}

dag = DAG('agent_matching_pipeline', default_args=default_args, schedule_interval='@daily')

t1 = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
t2 = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
t3 = PythonOperator(task_id='load', python_callable=load, dag=dag)

t1 >> t2 >> t3

## 🔄 Real-Time Pipeline Simulation with Kafka (Conceptual)

In [None]:
# Pseudo-code for real-time streaming setup using Kafka
from kafka import KafkaProducer, KafkaConsumer
import json

# Producer simulating real-time feedback
def send_feedback():
    producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    data = {"agent_id": 1, "rating": 5.0, "feedback": "Excellent service!"}
    producer.send('agent_feedback', data)
    producer.flush()

# Consumer to process streaming data
def process_feedback():
    consumer = KafkaConsumer('agent_feedback', bootstrap_servers='localhost:9092', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    for message in consumer:
        print("Received feedback:", message.value)

## 📘 Extended Summary
We have successfully built a simulation of a modern data engineering solution that includes:
- Batch ingestion and transformation
- Basic ML integration
- Cloud-based storage (S3)
- Real-time streaming architecture (Kafka concept)
- Workflow orchestration (Airflow DAG)
- Interactive analytics through Power BI / Looker