###  SocioTech Index

The **SocioTech Index** captures how each district balances its social and technological dimensions —
combining mobility, activity rhythm, digital intensity, green access, and safety into a single multidimensional profile.
Using an **autoencoder neural network**, the model compresses these features into a compact embedding that reveals structural similarities between districts.
Districts with high reconstruction errors stand out as **social–technical anomalies**, showing unique lifestyles or infrastructure patterns.
This index visualizes Warsaw’s socio-technical landscape — where the city’s digital, social, and environmental layers intersect.

In [2]:
import pandas as pd
import numpy as np
from pathlib import Path

In [5]:
RAW_CSV = Path("./data/hackplay_warszawa_with_districts.csv")
OPTIONAL_GREEN = Path("./data/warsaw_green_places.csv")
OPTIONAL_INCIDENTS = Path("./data/danger_stat_from_geoportal.csv")

In [6]:
df = pd.read_csv(RAW_CSV, usecols=["start_dttm","user_id","technology","district"], low_memory=False)
df["start_dttm"] = pd.to_datetime(df["start_dttm"], errors="coerce")
df = df.dropna(subset=["start_dttm","district","user_id"]).copy()
df["hour"] = df["start_dttm"].dt.hour

In [7]:
def scale_0_100(s: pd.Series):
    s = s.astype(float)
    mn, mx = s.min(), s.max()
    if pd.isna(mn) or mx == mn:
        return pd.Series(100.0, index=s.index)
    return (s - mn) / (mx - mn) * 100


In [8]:
def time_bucket(h: int) -> str:
    if 5 <= h < 11:  return "morning"
    if 11 <= h < 17: return "noon"
    if 17 <= h < 23: return "evening"
    return "night"

In [11]:
df["time_bucket"] = df["hour"].map(time_bucket)
traffic = (
    df[df["time_bucket"].isin(["morning","noon","evening"])]
      .groupby(["district","time_bucket"])["user_id"].nunique()
      .reset_index(name="unique_users")
)
traffic_score = (
    traffic.groupby("district")["unique_users"].sum()
           .pipe(scale_0_100)
           .rename("city_traffic")
)

In [13]:
df["slot_30"] = df["start_dttm"].dt.floor("30min")
dt = df.groupby(["district","slot_30"])["user_id"].nunique().rename("n").reset_index()
dt["copres"] = dt["n"] * (dt["n"] - 1) / 2
social_life = (
    dt.groupby("district")["copres"].median()
      .pipe(scale_0_100)
      .rename("social_life")
)


In [14]:
hourly = df.groupby(["district","hour"])["user_id"].nunique().unstack(fill_value=0)
hourly_norm = hourly.div(hourly.sum(axis=1).replace(0,np.nan), axis=0).fillna(0)
amplitude = (hourly_norm.max(axis=1) - hourly_norm.min(axis=1))
avg_act = hourly_norm.mean(axis=1)
rhythm_raw = 0.5*amplitude + 0.5*avg_act
rhythm = scale_0_100(rhythm_raw).rename("rhythm")

In [15]:
active_hours = (hourly_norm > 0.3).sum(axis=1)
social_availability = scale_0_100(active_hours).rename("social_availability")

In [16]:
tech_w = {"5G":1.3, "4G":1.0, "3G":0.6, "2G":0.3}
df["tech_weight"] = df["technology"].map(tech_w).fillna(1.0)
noise_agg = (df.groupby("district")
               .agg(total_obs=("user_id","count"),
                    unique_users=("user_id","nunique"),
                    avg_tech_weight=("tech_weight","mean")))
noise_raw = (noise_agg["total_obs"] / noise_agg["unique_users"].replace(0,np.nan)) * noise_agg["avg_tech_weight"]
digital_noise = scale_0_100(noise_raw.fillna(0)).rename("digital_noise")

In [17]:
presence_ratio = (noise_agg["unique_users"] / noise_agg["total_obs"].replace(0,np.nan)).fillna(0)
presence_rank = (presence_ratio.rank(pct=True)*100)
inverse_noise = 100 - digital_noise
life_balance = scale_0_100(0.6*presence_rank + 0.4*inverse_noise).rename("life_balance")

In [18]:
green = pd.Series(dtype=float, name="green_life")
if OPTIONAL_GREEN.exists():
    g = pd.read_csv(OPTIONAL_GREEN)
    green = g.set_index("district")["green_life_score"]
else:
    green = pd.Series(np.nan, index=hourly_norm.index, name="green_life")

In [19]:
safety = pd.Series(dtype=float, name="safety_index")
if OPTIONAL_INCIDENTS.exists():
    inc = pd.read_csv(OPTIONAL_INCIDENTS)
    inc = inc.set_index("district")["incidents"]
    inc_norm = (inc - inc.min()) / (inc.max() - inc.min() + 1e-9)
    safety = ((1 - inc_norm)*100).rename("safety_index")
else:
    safety = pd.Series(np.nan, index=hourly_norm.index, name="safety_index")

In [20]:
ind = pd.DataFrame(index=hourly_norm.index)
ind["city_traffic"] = traffic_score
ind["social_life"] = social_life
ind["rhythm"] = rhythm
ind["green_life"] = green.reindex(ind.index)
ind["digital_noise"] = digital_noise
ind["life_balance"] = life_balance
ind["social_availability"] = social_availability
ind["safety_index"] = safety.reindex(ind.index)

ind.reset_index(names="district", inplace=True)
ind.to_csv("./data/district_indicators.csv", index=False)

In [22]:
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [23]:
features_cols = ["city_traffic","social_life","rhythm","green_life",
                 "digital_noise","life_balance","social_availability","safety_index"]
X = ind[features_cols].astype(float).copy()

In [24]:
X = X.apply(lambda col: col.fillna(col.median()))
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X.values)

inp = keras.Input(shape=(X_scaled.shape[1],))
latent_dim = 3
h = layers.Dense(16, activation="relu")(inp)
h = layers.Dense(8, activation="relu")(h)
z = layers.Dense(latent_dim, activation="linear", name="latent")(h)
h2 = layers.Dense(8, activation="relu")(z)
h2 = layers.Dense(16, activation="relu")(h2)
out = layers.Dense(X_scaled.shape[1], activation="sigmoid")(h2)

autoenc = keras.Model(inp, out)
enc = keras.Model(inp, z)
autoenc.compile(optimizer=keras.optimizers.Adam(1e-3), loss="mse")
cb = [keras.callbacks.EarlyStopping(monitor="loss", patience=50, restore_best_weights=True)]
autoenc.fit(X_scaled, X_scaled, epochs=1000, batch_size=len(X_scaled), verbose=0, callbacks=cb)

Z = enc.predict(X_scaled, verbose=0)
X_rec = autoenc.predict(X_scaled, verbose=0)
recon_err = ((X_scaled - X_rec)**2).sum(axis=1)

  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))


In [25]:
from scipy.stats import zscore
err_z = zscore(recon_err)
anom = np.clip((err_z - err_z.min())/(err_z.max()-err_z.min()+1e-9), 0, 1)*100
ind["soc_tech_emb_1"] = Z[:,0]
ind["soc_tech_emb_2"] = Z[:,1] if Z.shape[1] > 1 else 0.0
ind["soc_tech_emb_3"] = Z[:,2] if Z.shape[1] > 2 else 0.0
ind["sociotech_anomaly_score"] = np.round(anom, 1)
ind["sociotech_index"] = np.round(100 - ind["sociotech_anomaly_score"], 1)

ind.to_csv("./data/sociotech_index.csv", index=False)

In [26]:
try:
    import umap
    reducer = umap.UMAP(n_neighbors=8, min_dist=0.2, random_state=42)
    Z2 = reducer.fit_transform(Z)
except Exception:
    from sklearn.decomposition import PCA
    Z2 = PCA(n_components=2, random_state=42).fit_transform(Z)
pd.DataFrame({"district": ind["district"], "soc_tech_2d_x": Z2[:,0], "soc_tech_2d_y": Z2[:,1]}).to_csv("./data/sociotech_embedding_2d.csv", index=False)

ValueError: Input X contains NaN.
PCA does not accept missing values encoded as NaN natively. For supervised learning, you might want to consider sklearn.ensemble.HistGradientBoostingClassifier and Regressor which accept missing values encoded as NaNs natively. Alternatively, it is possible to preprocess the data, for instance by using an imputer transformer in a pipeline or drop samples with missing values. See https://scikit-learn.org/stable/modules/impute.html You can find a list of all estimators that handle NaN values at the following page: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values