In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
spark = SparkSession.builder.appName("MostPopularSuperhero").getOrCreate()

In [3]:
schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

In [4]:
names = spark.read.schema(schema).option("sep", " ").csv("Marvel-names")

In [5]:
lines = spark.read.text("Marvel-graph")

In [6]:
lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1).show()

+--------------------+----+-----------+
|               value|  id|connections|
+--------------------+----+-----------+
|5988 748 1722 375...|5988|         48|
|5989 4080 4264 44...|5989|         40|
|5982 217 595 1194...|5982|         42|
|5983 1165 3836 43...|5983|         14|
|5980 2731 3712 15...|5980|         24|
|5981 3569 5353 40...|5981|         17|
|5986 2658 3712 26...|5986|        142|
|5987 2614 5716 17...|5987|         81|
|5984 590 4898 745...|5984|         41|
|5985 3233 2254 21...|5985|         19|
|6294 4898 1127 32...|6294|         13|
|270 2658 3003 380...| 270|         42|
|271 4935 5716 430...| 271|          9|
|272 2717 4363 408...| 272|         45|
|273 1165 5013 511...| 273|         58|
|274 3920 5310 402...| 274|        410|
|275 4366 3373 158...| 275|         47|
|276 2277 5251 480...| 276|         15|
|277 1068 3495 619...| 277|         16|
|278 1145 667 2650...| 278|        123|
+--------------------+----+-----------+
only showing top 20 rows



Small tweak vs. what's shown in the video: we trim each line of whitespace as that could<br>
throw off the counts.

In [7]:
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
    .groupBy("id").agg(func.sum("connections").alias("connections"))
    
mostPopular = connections.sort(func.col("connections").desc()).first()

In [8]:
mostPopular

Row(id='859', connections=1933)

In [9]:
mostPopularName = names.filter(func.col("id") == mostPopular[0]).select("name").first()

In [15]:
type(mostPopularName)

pyspark.sql.types.Row

In [11]:
print(mostPopularName[0] + " is the most popular superhero with " + str(mostPopular[1]) + " co-appearances.")

CAPTAIN AMERICA is the most popular superhero with 1933 co-appearances.
