In [0]:
%pip install pymongo certifi pandas


In [0]:
dbutils.library.restartPython()

In [0]:
# 1
from pymongo import MongoClient
import certifi

# 2
client = MongoClient("mongodb+srv://sporty_user:9tyRUcNDxIUQdJ5O@sportycluster.xv983nu.mongodb.net/", tlsCAFile=certifi.where())

# 3
db = client["sporty"]

# 4
client.admin.command("ping")
print("Connected to MongoDB Atlas!")

In [0]:
from pyspark.sql import functions as F

# people = spark.createDataFrame([
#    ("u1", 16, "F", "Stockholm"),
#    ("u2", 27, "M", "Göteborg"),
#    ("u3", 42, "X", "Malmö"),
#    ("u4", 35, "M", "Uppsala"),
#    ("u5", 58, "F", "Linköping"),
#], ["user_id", "age", "gender", "city"])

jdbc_url = "postgresql://postgres.wnmgzclltjajdcvemiwm:8r8j#KbtNLy+hj!@aws-1-eu-west-1.pooler.supabase.com:5432/postgres"

df_people = (
    spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "public.decoded_users")
    .option("driver", "org.postgresql.Driver")
    .load()
)

display(df_people)


survey = spark.createDataFrame([
    ("u1", ["löpning", "gym", "vandring"],  4, True),
    ("u2", ["vandring", "camping", "yoga"],         5, True),
    ("u3", ["yoga", "promenad"],            2, False),
    ("u4", ["cykling", "vandring"],         4, True),
    ("u5", ["fiske", "promenad"],           2, False),
], ["user_id", "interests", "intensity", "wants_sustainable"])

products = spark.createDataFrame([
    ("p1", "Löparskor Aero",        ["löpning"],        ["lätt", "ventilerande"],           "sommar", 1200),
    ("p2", "Vandringskänga Pro",    ["vandring"],       ["vattentät", "stabil"],            "helår",  1900),
    ("p3", "Campingkök Mini",       ["camping"],        ["lätt", "kompakt"],                "helår",   600),
    ("p4", "Yogamatta Eco",         ["yoga", "gym"],    ["grepp", "hållbar"],               "helår",   400),
    ("p5", "Regnjacka Trail",       ["vandring", "cykling"], ["vattentät", "andningsbar"],  "helår",  1500),
    ("p6", "Fiskeset Basic",        ["fiske"],          ["prisvärd", "nybörjare"],          "sommar",  800),
], ["product_id", "product_name", "sports", "features", "season", "price_sek"])


In [0]:
survey_long = (survey
               .withColumn("interest", F.explode("interests"))
               .drop("interests")
)

print(survey_long.show())




In [0]:
products_long = (
    products.withColumn("sport", F.explode("sports"))
    .drop("sports")
)

print(products_long.show())


In [0]:
joined = (
    people.join(survey_long, on="user_id", how="inner")
    .join(products_long, survey_long["interest"] == products_long["sport"], how="inner")
    .drop(products_long["sport"])
)

joined.display()

In [0]:
# Segment + score

segmented = (
    joined
    .withColumn(
        "age_band",
        F.when(F.col("age") < 20, "0-19")
         .when((F.col("age") >= 20) & (F.col("age") < 35), "20-34")
         .when((F.col("age") >= 35) & (F.col("age") < 50), "35-49")
         .otherwise("50+")
    )
    .withColumn(
        "score",
        F.round(
            F.col("intensity").cast("double")
            + F.when(F.col("wants_sustainable") & F.array_contains(F.col("features"), "hållbar"), F.lit(1.5)).otherwise(F.lit(0.0))
            - (F.col("price_sek") / F.lit(3000.0))
        ,3)
    )
)

segmented.display()

In [0]:
# Top-egenskaper per segment

feature_stats = (
    segmented
    .withColumn("feature", F.explode("features"))
    .groupBy("age_band", "gender", "interest", "feature")  # <-- gender tillagd här
    .agg(
        F.countDistinct("user_id").alias("unique_users"),
        F.countDistinct("product_id").alias("unique_products"),
        F.round(F.avg("score"), 2).alias("avg_score")
    )
    .orderBy(F.desc("unique_users"), F.desc("avg_score"))
)

display(feature_stats)

In [0]:
# Topprodukter per segment

top_products = (
    segmented
    .groupBy("age_band", "gender", "interest", "product_id", "product_name")  # <-- gender tillagd här
    .agg(
        F.round(F.avg("score"), 2).alias("avg_score"),
        F.countDistinct("user_id").alias("unique_users")
    )
    .orderBy(F.desc("avg_score"), F.desc("unique_users"))
)

display(top_products)