In [1]:
%load_ext autoreload
%autoreload 2
%load_ext lab_black

In [2]:
from pyspark.sql import functions as F
from manga_recsys.spark import get_spark

spark = get_spark()

In [7]:
manga = spark.read.parquet("../data/processed/2022-12-10-mangadex-manga.parquet")
manga.printSchema()
chapter = spark.read.parquet("../data/processed/2022-12-16-mangadex-chapter.parquet")
chapter.printSchema()

root
 |-- id: string (nullable = true)
 |-- relationships: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- related: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- availableTranslatedLanguages: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- chapterNumbersResetOnNewVolume: boolean (nullable = true)
 |    |-- contentRating: string (nullable = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- isLocked: boolean (nullable = true)
 |    |-- lastChapter: string (nullable = true)
 |    |-- lastVolume: string (nullable = true)
 |    |-- latestUploadedChapter: string (nullable = true)
 |    |-- originalLanguage: string (nullable = true)
 |    |-- publicationDemographic: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- status: 

We're going to build a relationship graph between scan groups and the manga that they translate.
The simplest relationship graph will be directly between scan groups and manga.

In [18]:
chapter.select("relationships.scanlation_group").distinct().count()

15979

In [17]:
manga_scan_freq = (
    chapter.select("relationships.*")
    .select("manga", "scanlation_group")
    .groupBy("manga")
    .agg(F.countDistinct("scanlation_group").alias("n_groups"))
    .orderBy(F.desc("n_groups"))
)
manga_scan_freq.show(10)
manga_scan_freq.where("n_groups > 1").count()

+--------------------+--------+
|               manga|n_groups|
+--------------------+--------+
|32d76d19-8a05-4db...|      78|
|f65444dc-3694-4e3...|      73|
|37f5cce0-8070-4ad...|      62|
|a96676e5-8ae2-425...|      58|
|30838c34-6b26-4b8...|      56|
|a1c7c817-4e59-43b...|      55|
|f7888782-0727-49b...|      55|
|7f30dfc3-0b80-4dc...|      55|
|304ceac3-8cdb-4fe...|      53|
|227e3f72-863f-46f...|      51|
+--------------------+--------+
only showing top 10 rows



21062

In [28]:
scan_manga = (
    chapter.select("attributes.pages", "relationships.*")
    .groupby("manga", "scanlation_group")
    .agg(F.sum("pages").alias("pages"))
    .orderBy(F.desc("pages"))
    .where("manga is not null and scanlation_group is not null")
)
scan_manga.show(10)

+--------------------+--------------------+-----+
|               manga|    scanlation_group|pages|
+--------------------+--------------------+-----+
|b1461071-bfbb-43e...|73206838-6025-4bc...|35255|
|c0ee660b-f9f2-45c...|cc5240d7-9b8f-4af...|33952|
|b1461071-bfbb-43e...|461047ab-d7a7-4bf...|33771|
|3fb3f1c3-153e-400...|cc5240d7-9b8f-4af...|30821|
|f7888782-0727-49b...|81e1f1ee-c8aa-489...|25442|
|b1461071-bfbb-43e...|59a4a5f3-3559-40e...|24609|
|cbf174ca-af25-441...|11fc2776-2413-41f...|21695|
|f7888782-0727-49b...|3ff4911b-f4ef-41d...|19922|
|f7888782-0727-49b...|df2df81e-a4ac-4af...|18962|
|a1c7c817-4e59-43b...|a4653168-f3c0-45e...|18368|
+--------------------+--------------------+-----+
only showing top 10 rows



In [75]:
# create a pyspark pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# create a pipeline that string vectorizes both scanlation_group and manga, then fits an ALS model
pipeline = Pipeline(
    stages=[
        StringIndexer(inputCol="scanlation_group", outputCol="scanlation_group_idx"),
        StringIndexer(inputCol="manga", outputCol="manga_idx"),
        ALS(
            userCol="scanlation_group_idx",
            itemCol="manga_idx",
            ratingCol="pages",
            nonnegative=True,
            coldStartStrategy="drop",
        ),
    ]
)

In [52]:
# fit the pipeline to the data
model = pipeline.fit(scan_manga)

# show off the model
model

PipelineModel_0a82618bdb7e

In [53]:
# predict the top 10 manga for each scanlation group that isn't already something they've done
scan_manga_pred = model.transform(scan_manga)
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="pages", predictionCol="prediction"
)
rmse = evaluator.evaluate(scan_manga_pred)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 112.5917812501897


In [70]:
scan_manga_pred.printSchema()

root
 |-- manga: string (nullable = true)
 |-- scanlation_group: string (nullable = true)
 |-- pages: long (nullable = true)
 |-- scanlation_group_idx: double (nullable = false)
 |-- manga_idx: double (nullable = false)
 |-- original_scanlation_group: string (nullable = true)
 |-- original_manga: string (nullable = true)
 |-- prediction: float (nullable = false)



In [54]:
user_recs = model.stages[-1].recommendForAllUsers(10)
user_recs.show()

+--------------------+--------------------+
|scanlation_group_idx|     recommendations|
+--------------------+--------------------+
|                  38|[{1835, 5398.25},...|
|                  57|[{1835, 6268.892}...|
|                  64|[{482, 8963.034},...|
|                  77|[{5348, 12963.519...|
|                  80|[{482, 6116.041},...|
|                  85|[{1130, 8308.689}...|
|                  94|[{91, 5265.7554},...|
|                 104|[{482, 11728.008}...|
|                 111|[{1130, 5391.4146...|
|                 146|[{22, 12386.544},...|
|                 176|[{482, 7069.705},...|
|                 178|[{1835, 5960.0737...|
|                 270|[{482, 9633.656},...|
|                 273|[{7, 6168.337}, {...|
|                 289|[{1835, 8972.2705...|
|                 300|[{1170, 6506.4263...|
|                 334|[{91, 11459.936},...|
|                 341|[{1170, 4287.996}...|
|                 345|[{482, 7155.589},...|
|                 346|[{482, 414

In [55]:
model.stages

[StringIndexerModel: uid=StringIndexer_46d90c6cf1fe, handleInvalid=error,
 StringIndexerModel: uid=StringIndexer_381a5574ec2e, handleInvalid=error,
 IndexToString_fc1ffbccf1ee,
 IndexToString_ecfb42dec8df,
 ALSModel: uid=ALS_4a9acf03a5e4, rank=10]

In [77]:
# lets convert the recommendation back to the original scan id and manga id
exploded_user_rec = user_recs.withColumn(
    "exploded", F.explode("recommendations")
).select("scanlation_group_idx", "exploded.*")
exploded_user_rec.printSchema()

inverse_pipeline = Pipeline(
    stages=[
        IndexToString(
            inputCol="scanlation_group_idx",
            outputCol="scanlation_group",
            labels=model.stages[0].labels,
        ),
        IndexToString(
            inputCol="manga_idx", outputCol="manga", labels=model.stages[1].labels
        ),
    ]
)
# # now go from idx to the origin id
inversed_model = inverse_pipeline.fit(exploded_user_rec)
inversed_recs = inversed_model.transform(exploded_user_rec)
inversed_recs.show()

root
 |-- scanlation_group_idx: integer (nullable = false)
 |-- manga_idx: integer (nullable = true)
 |-- rating: float (nullable = true)

+--------------------+---------+---------+--------------------+--------------------+
|scanlation_group_idx|manga_idx|   rating|    scanlation_group|               manga|
+--------------------+---------+---------+--------------------+--------------------+
|                  38|     1835|  5398.25|6a9a19ee-ebe2-4b5...|aea1aa8b-0dd7-4d3...|
|                  38|    20029| 5279.767|6a9a19ee-ebe2-4b5...|e2959879-a314-4ae...|
|                  38|     1170| 4902.409|6a9a19ee-ebe2-4b5...|de34e04a-a9a4-4d9...|
|                  38|     1236|4763.3516|6a9a19ee-ebe2-4b5...|1d5e923f-71e0-4bb...|
|                  38|     7949|4581.1377|6a9a19ee-ebe2-4b5...|175cf215-2122-465...|
|                  38|      590|4486.0005|6a9a19ee-ebe2-4b5...|c433f163-d610-49b...|
|                  38|    17657|4415.1987|6a9a19ee-ebe2-4b5...|9dee3a07-f3d0-45e...|
|          

In [81]:
(
    inversed_recs.select("scanlation_group", "manga")
    .join(scan_manga, on=["manga", "scanlation_group"], how="left_outer")
    .groupby("scanlation_group")
    .agg(F.countDistinct("manga").alias("n_manga"))
    .where("n_manga < 10")
).show()
# looks like the recommendation pipeline is smart enough to not include manga that's already been done

+----------------+-------+
|scanlation_group|n_manga|
+----------------+-------+
+----------------+-------+

