# Get Portal Network

In [1]:
from pathlib import Path
from pyspark.sql import SparkSession, functions as F, types as T, Window


def flatten(df, path: list, dtype: str):
    """Flatten a document based on a path."""
    if len(path) < 2:
        raise ValueError("path must be at least two elements long")
    sub = df
    # walk the tree
    for i in range(len(path) - 1):
        key = path[i]
        subkey = path[i + 1]
        # TODO: test nested 3 deep
        next_dtype = "imgdir" if i < len(path) else dtype
        sub = (
            sub.select(F.col("_name").alias(key), F.explode("imgdir").alias("_tmp"))
            .select(*path[: i + 1], "_tmp.*")
            .withColumn(subkey, F.col("_name"))
            .where(F.col(subkey) == subkey)
            .drop("_name", "_value_tag")
        )
        sub = sub.select(*(path[: i + 2] + [c for c in sub.columns if c not in path]))
    if dtype == "imgdir":
        return sub.select(*path, dtype)
    is_array = sub.schema[dtype].dataType.typeName() == "array"
    subdoc = sub.withColumn("_tmp", F.explode(dtype) if is_array else F.col(dtype))
    return (
        subdoc.select(*path, "_tmp.*").groupBy(key).pivot("_name").agg(F.max("_value"))
    )


def get_portals(df):
    k = ["root", "portal"]
    q = (
        flatten(df, k, "imgdir")
        .select(*k, F.explode("imgdir").alias("_tmp"))
        .select(*k, F.col("_tmp._name").alias("i"), "_tmp.*")
        .drop("_name", "_value_tag")
        .select(*k, "i", F.explode("int").alias("_tmp"))
        .where(F.col("_tmp._name") == "tm")
        # what is tm?
        .select(
            *k,
            F.collect_list("_tmp._value")
            .over(Window.partitionBy(*k).orderBy("i"))
            .alias("tm"),
        )
        .groupby(*k)
        .agg(F.max("tm").alias("tm"))
        .select(F.split(F.col("root"), "\.")[0].alias("src").astype("int"), F.col("tm").alias("dst"))
    )
    return q

In [19]:
wzpath = "../../wzexports"
spark = SparkSession.builder.getOrCreate()
df = (
    spark.read.format("xml")
    .options(rowTag="imgdir", valueTag="_value_tag")
    .load(f"{wzpath}/Map.wz/Map/*/*.xml")
)

In [20]:
portals = (
    get_portals(df)
    .select("src", F.explode("dst").alias("dst"))
    .groupBy("src", "dst")
    .agg(F.count("*").alias("weight"))
)

In [21]:
portals.show(n=5)

+---------+---------+------+
|      src|      dst|weight|
+---------+---------+------+
|    40000|    40001|     1|
|100040003|999999999|     3|
|101000003|999999999|     8|
|101000200|101000200|     6|
|101030102|101030101|     1|
+---------+---------+------+
only showing top 5 rows



In [22]:
portals.drop("weight").where(
    "src <> 999999999 and dst <> 999999999"
).toPandas().to_csv("../data/processed/maps.csv", index=False, header=False)

# Get string data

In [6]:
df = (
    spark.read.format("xml")
    .options(rowTag="imgdir", valueTag="_value_tag")
    .load(f"{wzpath}/String.wz/Map.img.xml")
)

In [9]:
def flat_region(df, region):
    q=(
        flatten(df, ["root", region], "imgdir")
        .select(F.explode("imgdir"))
        .selectExpr("col._name as uid", "explode(col.string)")
        .groupBy("uid")
        .pivot("col._name")
        .agg(F.max("col._value"))
    )
    return q

q = flat_region(df, "victoria")
q.cache()
q.show(n=3, truncate=20)
q.printSchema()

+---------+-----+-----+-------+--------------------+-------------+
|      uid|help0|help1|mapDesc|             mapName|   streetName|
+---------+-----+-----+-------+--------------------+-------------+
|100000004| null| null|   null|         Pig Park II|Hidden Street|
|100000102| null| null|   null|Henesys Departmen...|Victoria Road|
|100000116| null| null|   null|Henesys Free Mark...|Victoria Road|
+---------+-----+-----+-------+--------------------+-------------+
only showing top 3 rows

root
 |-- uid: long (nullable = true)
 |-- help0: string (nullable = true)
 |-- help1: string (nullable = true)
 |-- mapDesc: string (nullable = true)
 |-- mapName: string (nullable = true)
 |-- streetName: string (nullable = true)



In [10]:
q.count()

387

In [11]:
q.selectExpr("uid as Id", "mapName as Label", "streetName").toPandas().to_csv(
    "../data/processed/victoriaLabels.csv", index=False
)

In [17]:
(
    portals.join(q.selectExpr("uid as src"), how="inner", on="src")
    .join(q.selectExpr("uid as dst"), how="inner", on="dst")
    .drop("weight")
    .toPandas()
    .to_csv("../data/processed/victoria.csv", index=False, header=False)
)