In [38]:
###
# @Author             : Monserrat López
# @Date               : 2025-03-17 
# @Last Modified Date : 2025-04-21
# @Description        : Cleaning and feature extraction pipeline for EU-based data centers
###

In [2]:
# Imports
import numpy as np
import re
import json
import pandas as pd

In [3]:
# Load scraped raw data from JSON cache
with open("../cache/scrape_cache.json", "r") as f:
    data_raw = json.load(f)

# Convert JSON dictionary to DataFrame
df = pd.DataFrame.from_dict(data_raw, orient="index").reset_index(drop=True)

In [4]:
def split_operator_and_clean_address(address):
    if pd.isna(address):
        return pd.Series([None, None])
    lines = address.strip().split('\n')
    operator = lines[0] if lines else None
    address_lines = [line.strip() for line in lines[1:] if line.strip()]
    clean_address = ', '.join(address_lines) if address_lines else None
    return pd.Series([operator, clean_address])

df[['operator_name', 'clean_address']] = df['Address'].apply(split_operator_and_clean_address)

In [5]:
# Normalize country names
df["Country"] = df["Country"].str.strip().str.lower()

In [7]:
# Infer missing datacenter names from URLs
def infer_name_from_url(url):
    return url.strip("/").split("/")[-1].replace("-", "_") if isinstance(url, str) else None

df["DatacenterName"] = df.apply(
    lambda row: infer_name_from_url(row["URL"]) if pd.isna(row["DatacenterName"]) else row["DatacenterName"],
    axis=1
)

In [8]:
# Filter out non-operational or planned facilities
mask_exclude = df["Description"].str.lower().str.contains(
    "archived listing|currently listed as: planned|under construction|this data center may not be available", na=False
)
print(f"Entries excluded due to construction/planning/archive: {mask_exclude.sum()}")

cleaned_df = df[~mask_exclude].copy()

Entries excluded due to construction/planning/archive: 180


In [10]:
# Normalize column names
cleaned_df.columns = cleaned_df.columns.str.strip().str.replace(" ", "_").str.lower()

#### Feature Extraction

In [46]:
# Extract PUE from description
def extract_pue(desc):
    result = {"pue_estimate": None, "has_pue": False, "pue_description": None}
    if pd.isna(desc): return pd.Series(result)
    patterns = [
        (r'(?:pue|power usage effectiveness).*?(?:between|from)\s*(\d\.\d+)\s*(?:and|to)\s*(\d\.\d+)', lambda m: (float(m.group(1)) + float(m.group(2))) / 2),
        (r'(?:pue|power usage effectiveness).*?(?:less than|as low as|under)\s*(\d\.\d+)', lambda m: float(m.group(1))),
        (r'(?:pue|power usage effectiveness).*?(?:is|of|at|=)?\s*(\d\.\d+)', lambda m: float(m.group(1)))
    ]
    for pattern, func in patterns:
        match = re.search(pattern, desc, re.IGNORECASE)
        if match:
            result["pue_estimate"] = func(match)
            result["has_pue"] = True
            result["pue_description"] = match.group(0).strip()
            break
    return pd.Series(result)

cleaned_df[["pue_estimate", "has_pue", "pue_description"]] = cleaned_df["description"].apply(extract_pue)
cleaned_df["pue_estimate"] = cleaned_df["pue_estimate"].where(cleaned_df["pue_estimate"].between(1.0, 2.5))


In [47]:
# Extract specifications: power, whitespace, building size
def extract_specs(spec):
    result = {"power_built_out_mw": None, "live_power_mw": None, "whitespace_sqm": None, "building_size_sqm": None}
    if pd.isna(spec): return pd.Series(result)
    num_re = r'([\d,]+(?:\.\d+)?)'
    unit_re = r'(sq\.?\s?[mf]|m2|sqm|sqft|ft2|sq\.?m\.?|sq\.?f\.?)'
    power = re.search(r'(?:built[-\s]?out\s+)?power\s*[:\-]?\s*' + num_re + r'\s*(mw|kw)', spec, re.IGNORECASE)
    if power:
        val, unit = float(power.group(1).replace(',', '')), power.group(2).lower()
        result['power_built_out_mw'] = val if unit == 'mw' else val / 1000
    live = re.search(r'(live|actual|available)\s+power\s*[:\-]?\s*' + num_re + r'\s*(mw|kw)', spec, re.IGNORECASE)
    if live:
        val, unit = float(live.group(2).replace(',', '')), live.group(3).lower()
        result['live_power_mw'] = val if unit == 'mw' else val / 1000
    white = re.search(r'whitespace\s+(?:built[-\s]?out\s+)?' + num_re + r'\s*' + unit_re, spec, re.IGNORECASE)
    if white:
        val, unit = float(white.group(1).replace(',', '')), white.group(2).lower()
        result['whitespace_sqm'] = val * 0.0929 if 'ft' in unit else val
    build = re.search(r'building\s+size\s+' + num_re + r'\s*' + unit_re, spec, re.IGNORECASE)
    if build:
        val, unit = float(build.group(1).replace(',', '')), build.group(2).lower()
        result['building_size_sqm'] = val * 0.0929 if 'ft' in unit else val
    return pd.Series(result)

specs_df = cleaned_df["specs"].apply(extract_specs)
cleaned_df = pd.concat([cleaned_df, specs_df], axis=1)

In [48]:
## Extract Tier from description
def extract_tier(text):
    if pd.isna(text): return None
    match = re.search(r'tier\s*(\d|i{1,3}|iv)', text.lower())
    roman_to_int = {'i': 1, 'ii': 2, 'iii': 3, 'iv': 4}
    if match:
        val = match.group(1).lower()
        return int(val) if val.isdigit() else roman_to_int.get(val)
    return None

cleaned_df["tier_level"] = cleaned_df["description"].apply(extract_tier)
cleaned_df["tier_level"] = cleaned_df["tier_level"].where(cleaned_df["tier_level"].between(1, 4))

In [49]:
print(cleaned_df.columns.tolist())

['url', 'address', 'description', 'website', 'specs', 'status', 'country', 'city', 'datacentername', 'operator_name', 'clean_address', 'pue_estimate', 'has_pue', 'pue_description', 'power_built_out_mw', 'live_power_mw', 'whitespace_sqm', 'building_size_sqm', 'tier_level']


In [50]:
# Normalize country/city names and add ISO and region labels
def normalize_location(df, country_col='country', city_col='city'):
    EU27 = {
        'austria': ('AT', 'AUT'), 'belgium': ('BE', 'BEL'), 'bulgaria': ('BG', 'BGR'),
        'croatia': ('HR', 'HRV'), 'cyprus': ('CY', 'CYP'), 'czechia': ('CZ', 'CZE'),
        'czech republic': ('CZ', 'CZE'), 'denmark': ('DK', 'DNK'), 'estonia': ('EE', 'EST'),
        'finland': ('FI', 'FIN'), 'france': ('FR', 'FRA'), 'germany': ('DE', 'DEU'),
        'deutschland': ('DE', 'DEU'), 'greece': ('GR', 'GRC'), 'hungary': ('HU', 'HUN'),
        'ireland': ('IE', 'IRL'), 'italy': ('IT', 'ITA'), 'latvia': ('LV', 'LVA'),
        'lithuania': ('LT', 'LTU'), 'luxembourg': ('LU', 'LUX'), 'malta': ('MT', 'MLT'),
        'netherlands': ('NL', 'NLD'), 'poland': ('PL', 'POL'), 'portugal': ('PT', 'PRT'),
        'romania': ('RO', 'ROU'), 'slovakia': ('SK', 'SVK'), 'slovenia': ('SI', 'SVN'),
        'spain': ('ES', 'ESP'), 'españa': ('ES', 'ESP'), 'sweden': ('SE', 'SWE')
    }
    REGION_MAP = {
        'Western Europe': ['FR', 'BE', 'LU', 'NL', 'DE', 'AT'],
        'Northern Europe': ['SE', 'FI', 'DK', 'IE', 'EE', 'LV', 'LT'],
        'Southern Europe': ['IT', 'ES', 'PT', 'GR', 'CY', 'MT'],
        'Eastern Europe': ['PL', 'CZ', 'SK', 'HU', 'RO', 'BG', 'SI', 'HR']
    }

    def norm_country(c):
        c = str(c).strip().lower().replace('-', ' ')
        for name, (iso2, iso3) in EU27.items():
            if name in c:
                return (name.title(), iso2, iso3)
        return ('Unknown', 'XX', 'XXX')

    def norm_city(c):
        if pd.isna(c): return ""
        lowers = {'de', 'la', 'le', 'du', 'van', 'von', 'am', 'im', 'des', 'sur', 'el', 'di', 'del', 'della'}
        return ' '.join([w.capitalize() if i == 0 or w not in lowers else w.lower() for i, w in
                         enumerate(c.strip().replace('-', ' ').split())])

    norm = df[country_col].apply(norm_country)
    df['country_normalized'] = norm.apply(lambda x: x[0])
    df['country_iso2'] = norm.apply(lambda x: x[1])
    df['country_iso3'] = norm.apply(lambda x: x[2])
    df['city_normalized'] = df[city_col].apply(norm_city)
    df['region'] = df['country_iso2'].apply(lambda x: next((r for r, v in REGION_MAP.items() if x in v), 'Other'))
    return df


cleaned_df = normalize_location(cleaned_df, country_col="country", city_col="city")

columns = [
    "url", "address", "clean_address", "country_iso2", "country_normalized", "city_normalized",
    "website", "description", "specs", "region", "datacentername", "operator_name", "pue_estimate",
    "power_built_out_mw", "live_power_mw", "whitespace_sqm", "building_size_sqm", "tier_level"
]

cleaned_df = cleaned_df[columns].copy()
cleaned_df.to_csv("../output/04eu_datacenters_cleaned_features.csv", index=False, encoding="utf-8")

In [51]:
print(f"Saved cleaned_df data with {len(cleaned_df)} entries.")

Saved cleaned_df data with 1615 entries.
