# Bronze Ingestion Pipeline: Social Trading Risk Analysis Platform

**Purpose:** Bronze layer ingestion - Extract raw trader profile data from social trading platforms

**Architecture Decision:** 
- Store raw HTML responses in Delta Lake for full reprocessability
- Immutable bronze layer - never update, only append
- Preserves source data even if platform changes structure

**Key Technical Challenges:**
1. **Rate Limiting:** Respectful scraping with proxy rotation + random delays
2. **Idempotency:** Anti-join pattern prevents duplicate processing
3. **Error Handling:** Store HTTP status for debugging failed requests

**Output:** `bronze_trader_profiles`- Delta table containing raw data (Bronze Layer)
- Schema: `user`, `timestamp`, `key`, `url`, `status`, `response`
- Partitioned by date for efficient time-travel queries

## 1. Environment **Setup**

In [0]:
%pip install python-dotenv
%restart_python

## 2. Proxy Configuration

**Technical Decision: Why Proxy Rotation?**

When extracting data from web sources at scale, rate limiting is the primary technical challenge:
- Single IP: Limited to ~50-100 requests before throttling
- Solution: Rotate through a pool of 10 proxy IPs

This enables respectful data collection while maintaining reliability:
- Random delays (5-20s) between requests
- Distributed load across proxy pool
- HTTP status tracking for monitoring

In [0]:
import requests
import os
import json
from dotenv import load_dotenv

env_path = os.path.join(
    os.path.dirname(os.getcwd()),
    '.env'
)
load_dotenv(env_path)

# Configuration Proxy
API_TOKEN = os.getenv("PROXY_API_TOKEN")
PARAMS = json.loads(os.getenv("PROXY_API_PARAMS"))
URL = os.getenv("PROXY_API_URL")

# Request list of proxies
response = requests.get(URL, headers={ "Authorization": API_TOKEN }, params=PARAMS)

if response.status_code != 200:
    raise Exception("Something went wrong in the answer of the server")

print(f"Identified {response.json()['count']} proxies")

proxy_list = response.json()['results']

## 3. Scraping Functions

**Design Pattern: Separation of Concerns**

This module handles ONLY data acquisition:
- `fetch_trader_profile_data()`: HTTP request with proxy rotation
- `get_pending_traders()`: Determines which users need processing (idempotency)
- `get_downloaded_traders()`: Metrics for monitoring progress
- `get_url_list()`: List of URL for each trader

**Parsing logic is deliberately separated** - Bronze layer stores raw HTML, 
Silver layer handles parsing. This allows:
- Reprocessing if parsing logic changes
- A/B testing different extraction strategies
- Full audit trail of source data

In [0]:
import pandas as pd
import requests
import time
import random

BRONZE_TRADER_PROFILES = os.getenv("BRONZE_TRADER_PROFILES")
PLATFORM_BASE_URL = os.getenv("PLATFORM_BASE_URL")

def fetch_trader_profile_data(url, use_proxy=True, proxy_list = None):
    """
    Fetch trader profile data from social trading platform
    
    Args:
        url (str): Profile page URL
        use_proxy (bool): Enable proxy rotation for rate limit management
        proxy_list (list): Pool of proxy configurations
        
    Returns:
        requests.Response: HTTP response containing profile HTML
        
    Note: This implementation uses web scraping as the platform does not
    provide a public API for bulk data extraction. In production systems,
    official API access is always preferred when available.
    """
    
    # Add headers to mimic a real browser
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    
    # Proxy configuration
    if use_proxy:

        # Randomly select a proxy
        selected_proxy = random.choice(proxy_list)
        
        # Format proxy URL
        proxy_url = f"http://{selected_proxy['username']}:{selected_proxy['password']}@{selected_proxy['proxy_address']}:{selected_proxy['port']}"
        
        proxies = {
            'http': proxy_url,
            'https': proxy_url,
        }
        print(f"Using proxy: {proxy_url}")
        response = requests.get(url, headers=headers, proxies=proxies)
    else:
        response = requests.get(url, headers=headers)
        
    return response

def get_pending_traders():
    """
    Identify traders pending data collection (idempotency logic)
    
    Returns:
        pd.DataFrame: Trader usernames not yet processed in last 5 days
        
    Implementation:
        - LEFT ANTI JOIN: source list NOT IN recent ingestions
        - 5-day refresh window for profile updates
        - Prevents duplicate processing within time window
    """
    users = pd.read_csv('../data/traders/users_tier1.txt', sep='\r', header=None)
    users.columns = ['user']
    
    existing_users = spark.table(BRONZE_TRADER_PROFILES) \
    .select(['user', 'timestamp']) \
    .where('timestamp > date_sub(current_date(), 5)') \
    .select(['user']) \
    .distinct() \
    .toPandas()
    
    return users[~users['user'].isin(existing_users['user'])]

def get_downloaded_traders (days = 0):
    """
    Count distinct users processed in last N days (monitoring metric)
    
    Args:
        days (int): Lookback window (0 = today only)
        
    Returns:
        int: Count of distinct users downloaded
    """

    return spark.table(BRONZE_TRADER_PROFILES) \
    .select(['user', 'timestamp']) \
    .where(f'timestamp > date_sub(current_date(), {days})') \
    .select(['user']) \
    .distinct() \
    .count()

def get_url_list(username):
    """
    Generate URLs for data extraction endpoints
    
    Note: Currently only fetching 'factsheet' endpoint
    Future: Can enable overview, trades, history for richer dataset
    """
    return {
        "factsheet": f"{PLATFORM_BASE_URL}/factsheet/{username}"
    }
    
    # Commented: Additional endpoints available but not used yet
    # return {
    #     "overview": f"{PLATFORM_BASE_URL}/etoro/{username}",
    #     "trades": f"{PLATFORM_BASE_URL}/etoro/{username}/trades",
    #     "history": f"{PLATFORM_BASE_URL}/etoro/{username}/history",
    #     "factsheet": f"{PLATFORM_BASE_URL}/factsheet/{username}"
    # }

## 4. Main Ingestion Loop

**Control Flow Logic:**

```
WHILE (pending_users > threshold):
  1. Sample 20 random users (prevents sequential patterns)
  2. For each user:
     - Scrape data with random proxy
     - Store raw HTML in Delta Lake (append-only)
     - Sleep 5-20 seconds (respectful rate limiting)
  3. Sleep 1-3 minutes between batches (avoid burst patterns)
```

**Why This Design?**
- **Random sampling:** Distributes load, avoids detection patterns
- **Random delays:** Mimics human browsing behavior
- **Batch processing:** Efficient while maintaining rate limits
- **Delta append:** Immutable bronze layer, no updates

**Monitoring:**
- Target: Process 20% of user list per run
- Progress: `users_downloaded(days=0)` tracks daily progress
- Logs: Print statements for debugging (can be enhanced with MLflow)

In [0]:
import datetime
import random
import time

while len(get_pending_traders()) * 0.2 - get_downloaded_traders(days=0) > 1:

    users = get_pending_traders().sample(20)

    for user in users['user']:
        print(user)
        urls = get_url_list(user)
        print(urls)
        for key, url in urls.items():
            data = []
            response = fetch_trader_profile_data(url, proxy_list=proxy_list)
            timestamp = datetime.datetime.now()
            
            data.append({
                'user': user,
                'timestamp': timestamp,
                'key': key,
                'url': url,
                'status': response.status_code,
                'response': response.text
            })
            
            # Printing
            print(key)
            print(response.status_code)
            
            # Convert the list of dictionaries to a Spark DataFrame
            df = spark.createDataFrame(data)

            # Write the DataFrame to a Delta table in the default schema
            df.write.format("delta").mode("append").saveAsTable(BRONZE_TRADER_PROFILES)
            sleep_duration = random.randint(5, 20)
            print(f"Sleeping for {sleep_duration} seconds")
            time.sleep(sleep_duration)
            print("-"*10)

    sleep_duration = random.randint(60, 3*60)
    print(f"Sleeping for {sleep_duration} seconds between requests")
    print("*"*10)
    time.sleep(sleep_duration)
