In [3]:
from pathlib import Path
import pandas as pd
from pyproj import Transformer
import math

# --------- CONFIG ---------
CODEPOINT_DIR = Path(r"/Users/arshad/Documents/prog/python/ArshFinanceCode/University/DIYScripts/codepo_gb")  # folder you unzipped
SEED_POSTCODE = "G77 5GQ"
RADIUS_KM = 1.5          # change radius to taste (e.g., 0.5, 1, 2, 5)
OUT_CSV = Path("nearby_postcodes.csv")

# If you only care about the G area file (Glasgow), try to find it:
# otherwise the script will scan all CSVs under CODEPOINT_DIR.
PREFER_AREA_FILE = "G"   # set None to scan all

# --------- HELPERS ---------
def norm_pc(s: str) -> str:
    return str(s).strip().upper().replace(" ", "")

def haversine_km(lat1, lon1, lat2, lon2):
    # Great-circle distance
    R = 6371.0088
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dl = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dl/2)**2
    return 2 * R * math.asin(math.sqrt(a))

def iter_codepoint_csvs(base: Path):
    # Code-Point Open comes in multiple CSVs; we’ll find them.
    csvs = list(base.rglob("*.csv"))
    if not csvs:
        raise FileNotFoundError(f"No CSV files found under {base}")
    return csvs

def load_codepoint(base: Path, prefer_area: str | None):
    csvs = iter_codepoint_csvs(base)

    if prefer_area:
        # try to prioritise files that look like they correspond to that area
        prefer = [p for p in csvs if p.stem.upper() == prefer_area.upper()]
        if prefer:
            csvs = prefer

    frames = []
    # Code-Point Open format commonly includes:
    # 0: postcode, 2: easting, 3: northing (exact columns can vary by edition)
    # We'll read without headers and pick columns by position.
    for p in csvs:
        try:
            df = pd.read_csv(p, header=None)
        except Exception:
            continue
        # must have at least 4 columns to hold pc/easting/northing in common layouts
        if df.shape[1] >= 4:
            frames.append(df)

    if not frames:
        raise RuntimeError("Could not read any Code-Point CSVs. Check file structure/encoding.")
    return pd.concat(frames, ignore_index=True)

# --------- LOAD DATA ---------
df = load_codepoint(CODEPOINT_DIR, PREFER_AREA_FILE)

# Try typical Code-Point Open column positions:
# col0 = postcode, col2 = easting, col3 = northing
df = df.rename(columns={0: "postcode_raw", 2: "easting", 3: "northing"})
df["postcode_norm"] = df["postcode_raw"].map(norm_pc)

seed_norm = norm_pc(SEED_POSTCODE)
seed_row = df.loc[df["postcode_norm"] == seed_norm]

if seed_row.empty:
    raise ValueError(f"Seed postcode {SEED_POSTCODE} not found in Code-Point data you loaded.")

seed_e = float(seed_row.iloc[0]["easting"])
seed_n = float(seed_row.iloc[0]["northing"])

# Convert British National Grid -> WGS84 lat/lon
transformer = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)
seed_lon, seed_lat = transformer.transform(seed_e, seed_n)

# Convert all points to lat/lon (vectorised enough for typical use)
# (If performance becomes an issue, we can optimize.)
lons, lats = transformer.transform(df["easting"].astype(float).values,
                                  df["northing"].astype(float).values)
df["lon"] = lons
df["lat"] = lats

# Distance filter
df["dist_km"] = [
    haversine_km(seed_lat, seed_lon, la, lo)
    for la, lo in zip(df["lat"].values, df["lon"].values)
]

near = df.loc[df["dist_km"] <= RADIUS_KM].copy()

# Optional: restrict to G77 outward district
near["outward"] = near["postcode_norm"].str.extract(r"^([A-Z]{1,2}\d[A-Z\d]?)", expand=False)
near = near.loc[near["outward"] == "G77"]

# Pretty formatting for output (insert space before last 3 chars)
def format_uk_pc(pc_norm: str) -> str:
    pc_norm = norm_pc(pc_norm)
    return pc_norm[:-3] + " " + pc_norm[-3:]

near["postcode"] = near["postcode_norm"].map(format_uk_pc)
near = near.sort_values("dist_km")

near[["postcode", "dist_km"]].to_csv(OUT_CSV, index=False)
print(f"Found {len(near)} postcodes within {RADIUS_KM} km in G77. Saved: {OUT_CSV.resolve()}")


Found 187 postcodes within 1.5 km in G77. Saved: /Users/arshad/Documents/prog/python/ArshFinanceCode/University/DIYScripts/nearby_postcodes.csv


In [29]:

OUT_CSV = Path("nearby_postcodes.csv")

postcodes = []

with OUT_CSV.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        pc = row.get("postcode")
        if pc:
            postcodes.append(pc.strip())

print(f"Loaded {len(postcodes)} postcodes")
print("First 10:", postcodes[:10])

Loaded 187 postcodes
First 10: ['G77 5GQ', 'G77 6XG', 'G77 5GR', 'G77 5UX', 'G77 5GY', 'G77 5GX', 'G77 5GS', 'G77 6RT', 'G77 6YL', 'G77 5GZ']


In [43]:
from urllib.parse import urlencode
from pathlib import Path
import csv

OUT_CSV = Path("nearby_postcodes.csv")
postcodes = []

with OUT_CSV.open(newline="", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        pc = row.get("postcode")
        if pc:
            postcodes.append(pc.strip())

print(f"Loaded {len(postcodes)} postcodes")
print("First 10:", postcodes[:10])

# Put your postcodes here
POSTCODES = [
    "G77 5GQ",
    "G77 6WX",
    # "G77 6AA",
]

BASE = "https://scotlis.ros.gov.uk/results"

def build_url(postcode: str) -> str:
    params = {
        "postcode": postcode,          # urlencode will handle spaces -> + (like your example)
        "searchType": "prices",
        "sortBy": "entryDate",
        "sortDir": "desc",
    }
    return f"{BASE}?{urlencode(params)}"

rows = [(pc, build_url(pc)) for pc in postcodes]

import requests
import time
DELAY_SECONDS = 5        # polite delay between requests
TIMEOUT = 20  # seconds


HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"
}

url = 'https://scotlis.ros.gov.uk/results?postcode=G77+5GQ&searchType=prices&sortBy=entryDate&sortDir=desc'
session = requests.Session()
#session.headers.update(HEADERS)
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'en-GB,en;q=0.9',
    'Referer': 'https://scotlis.ros.gov.uk/',
    'DNT': '1'
})

resp = session.get(url, timeout=TIMEOUT)
resp


import json
import re
from datetime import datetime
import csv
from pathlib import Path

YEARS = {2025, 2026}

def extract_next_data_json(html: str) -> dict:
    """
    Extracts the JSON inside <script id="__NEXT_DATA__" type="application/json"> ... </script>
    """
    m = re.search(
        r'<script[^>]+id="__NEXT_DATA__"[^>]*>(\{.*?\})</script>',
        html,
        flags=re.DOTALL
    )
    if not m:
        raise ValueError("Could not find __NEXT_DATA__ JSON in the HTML.")
    return json.loads(m.group(1))

def parse_rows_from_next_data(next_data: dict):
    """
    Returns list of dict rows: address, entryDate, consideration, titleNumber.
    """
    page_props = next_data["props"]["pageProps"]
    addresses = page_props.get("addresses", [])
    rows = []
    for a in addresses:
        entry_date = a.get("entryDate")  # e.g. "2026-01-06"
        rows.append({
            "address": a.get("prettyPrint"),
            "entryDate": entry_date,
            "year": int(entry_date[:4]) if entry_date else None,
            "price": a.get("consideration"),
            "titleNumber": a.get("titleNumber"),
        })
    return rows

def filter_by_year(rows, years=YEARS):
    out = []
    for r in rows:
        if r.get("year") in years:
            out.append(r)
    # sort newest first
    out.sort(key=lambda x: x.get("entryDate") or "", reverse=True)
    return out

def save_csv(rows, path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["entryDate", "address", "price", "titleNumber"])
        w.writeheader()
        for r in rows:
            w.writerow({
                "entryDate": r.get("entryDate"),
                "address": r.get("address"),
                "price": r.get("price"),
                "titleNumber": r.get("titleNumber"),
            })


def get_filtered_rows(url: str):
    print(f"Getting filtered rows for URL: {url}")
    resp = session.get(url, timeout=TIMEOUT)
    html = resp.text
    #print(f"Fetched {url} (html {html})")
    print(f"Fetched {url} (html code {resp.status_code})")
    next_data = extract_next_data_json(html)
    rows = parse_rows_from_next_data(next_data)
    filtered = filter_by_year(rows, YEARS)
    return filtered

import time
from requests.exceptions import RequestException

def get_filtered_rows_with_retry(url: str, max_retries: int = 3):
    print(f"Getting filtered rows for URL: {url}")
    
    for attempt in range(max_retries):
        try:
            resp = session.get(url, timeout=TIMEOUT)
            
            # Check if response is successful
            if resp.status_code == 200:
                html = resp.text
                print(f"Fetched {url} (status code {resp.status_code})")
                next_data = extract_next_data_json(html)
                rows = parse_rows_from_next_data(next_data)
                filtered = filter_by_year(rows, YEARS)
                return filtered
            
            # Handle blocking/bot detection (403, 429, etc.)
            elif resp.status_code in [403, 429]:
                print(f"Server blocking detected (status {resp.status_code}). Attempt {attempt + 1}/{max_retries}")
                if attempt < max_retries - 1:
                    wait_time = (2 ** attempt) * 2  # Exponential backoff: 2s, 4s, 8s
                    print(f"Waiting {wait_time} seconds before retry...")
                    time.sleep(wait_time)
                    # Optional: Rotate user agent here
                    continue
                else:
                    print(f"Failed after {max_retries} attempts due to blocking. Status: {resp.status_code}")
                    return []
                    #raise Exception(f"Failed after {max_retries} attempts. Status: {resp.status_code}")
            
            # Handle other non-200 responses
            else:
                print(f"Non-200 status code: {resp.status_code}. Attempt {attempt + 1}/{max_retries}")
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
                    print(f"Waiting {wait_time} seconds before retry...")
                    time.sleep(wait_time)
                    continue
                else:
                    #raise Exception(f"Failed after {max_retries} attempts. Status: {resp.status_code}")
                    print(f"Failed after {max_retries} attempts due to blocking. Status: {resp.status_code}")
                    return []
                
        
        except RequestException as e:
            print(f"Request exception: {e}. Attempt {attempt + 1}/{max_retries}")
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Waiting {wait_time} seconds before retry...")
                time.sleep(wait_time)
                continue
            else:
                #raise Exception(f"Failed after {max_retries} attempts due to exception: {e}")
                print(f"Failed after {max_retries} attempts due to blocking. exception: {e}")
                return []
    
    raise Exception(f"Failed to fetch {url} after {max_retries} attempts")


filtered_rows_all = []
for postcode, url in rows:
    filtered_rows = get_filtered_rows_with_retry(url)
    result = [x for x in filtered_rows if "no price available" not in x["price"].lower()]

    filtered_rows_all.append(result)

    if len(result) > 0:
        print(f"Total filtered_rows found: {len(result)} for {postcode}")
        for r in result:
            print(r["entryDate"], "-", r["address"], "-", r["price"])
        time.sleep(DELAY_SECONDS)
        continue
    
print(f"Printing all results. {filtered_rows_all}")
for r in filtered_rows_all:
    print(r["entryDate"], "-", r["address"], "-", r["price"])
    




Loaded 187 postcodes
First 10: ['G77 5GQ', 'G77 6XG', 'G77 5GR', 'G77 5UX', 'G77 5GY', 'G77 5GX', 'G77 5GS', 'G77 6RT', 'G77 6YL', 'G77 5GZ']
Getting filtered rows for URL: https://scotlis.ros.gov.uk/results?postcode=G77+5GQ&searchType=prices&sortBy=entryDate&sortDir=desc
Fetched https://scotlis.ros.gov.uk/results?postcode=G77+5GQ&searchType=prices&sortBy=entryDate&sortDir=desc (status code 200)
Getting filtered rows for URL: https://scotlis.ros.gov.uk/results?postcode=G77+6XG&searchType=prices&sortBy=entryDate&sortDir=desc
Fetched https://scotlis.ros.gov.uk/results?postcode=G77+6XG&searchType=prices&sortBy=entryDate&sortDir=desc (status code 200)
Getting filtered rows for URL: https://scotlis.ros.gov.uk/results?postcode=G77+5GR&searchType=prices&sortBy=entryDate&sortDir=desc
Fetched https://scotlis.ros.gov.uk/results?postcode=G77+5GR&searchType=prices&sortBy=entryDate&sortDir=desc (status code 200)
Total filtered_rows found: 1 for G77 5GR
2025-06-25 - 25 EVIE WYND, NEWTON MEARNS, GLAS

KeyboardInterrupt: 

In [19]:
rows[0][1]

'https://scotlis.ros.gov.uk/results?postcode=G77+5GQ&searchType=prices&sortBy=entryDate&sortDir=desc'