Set up spark:

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=f88e75afe5e140ac6b310075c234bc228317b5a4c159ef5babfaa95595824e3c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)
spark = SparkSession.builder.getOrCreate()

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType

Read and process data:

In [4]:
# Process graph
lines = spark.read.text('/content/drive/MyDrive/spark_tutorials/spark_datasets/MarvelGraph.txt')


In [5]:
hero_connections = lines.withColumn('hero_id', F.split(F.col('value'), ' ')[0])\
                        .withColumn('connections', F.size(F.split(F.col('value'), ' '))-1)\
                        .groupBy('hero_id')\
                        .agg(F.sum('connections').alias('n_connections'))\
                        .sort(F.desc('n_connections'))

hero_connections.show()

+-------+-------------+
|hero_id|n_connections|
+-------+-------------+
|    859|         1937|
|   5306|         1745|
|   2664|         1532|
|   5716|         1429|
|   6306|         1397|
|   3805|         1389|
|   2557|         1374|
|   4898|         1348|
|   5736|         1292|
|    403|         1283|
|   6066|         1266|
|   2650|         1247|
|   2399|         1179|
|   1289|         1107|
|   5467|         1098|
|    133|         1097|
|   6148|         1096|
|    154|         1095|
|   5046|         1083|
|   1602|         1082|
+-------+-------------+
only showing top 20 rows



In [6]:
hero_schema = StructType([
    StructField('id', IntegerType(), nullable=False),
    StructField('name', StringType(), nullable=False)
])

hero_names = spark.read.schema(hero_schema).option('sep', ' ').csv('/content/drive/MyDrive/spark_tutorials/spark_datasets/MarvelNames')

Show most popular heros:

In [7]:
named_hero_connections = hero_connections.join(hero_names, hero_connections.hero_id == hero_names.id, 'inner')\
                                         .sort(F.desc('n_connections'))

named_hero_connections.show()

+-------+-------------+----+--------------------+
|hero_id|n_connections|  id|                name|
+-------+-------------+----+--------------------+
|    859|         1937| 859|     CAPTAIN AMERICA|
|   5306|         1745|5306|SPIDER-MAN/PETER PAR|
|   2664|         1532|2664|IRON MAN/TONY STARK |
|   5716|         1429|5716|THING/BENJAMIN J. GR|
|   6306|         1397|6306|    WOLVERINE/LOGAN |
|   3805|         1389|3805|MR. FANTASTIC/REED R|
|   2557|         1374|2557|HUMAN TORCH/JOHNNY S|
|   4898|         1348|4898|SCARLET WITCH/WANDA |
|   5736|         1292|5736|THOR/DR. DONALD BLAK|
|    403|         1283| 403|BEAST/HENRY &HANK& P|
|   6066|         1266|6066|             VISION |
|   2650|         1247|2650|INVISIBLE WOMAN/SUE |
|   2399|         1179|2399|                HAWK|
|   1289|         1107|1289|CYCLOPS/SCOTT SUMMER|
|   5467|         1098|5467|STORM/ORORO MUNROE S|
|    133|         1097| 133|ANGEL/WARREN KENNETH|
|   6148|         1096|6148|WASP/JANET VAN DYNE |


Show most obscure heros:

In [8]:
named_hero_connections.sort(F.asc('n_connections')).show()

+-------+-------------+----+--------------------+
|hero_id|n_connections|  id|                name|
+-------+-------------+----+--------------------+
|   4517|            1|4517|              RANDAK|
|    577|            1| 577|              BLARE/|
|   3490|            1|3490|MARVEL BOY II/MARTIN|
|   3489|            1|3489|MARVEL BOY/MARTIN BU|
|   2139|            1|2139|      GIURESCU, RADU|
|   1089|            1|1089|       CLUMSY FOULUP|
|   1841|            1|1841|              FENRIS|
|    467|            1| 467|        BERSERKER II|
|   5028|            1|5028|           SHARKSKIN|
|    835|            1| 835|     CALLAHAN, DANNY|
|   1408|            1|1408|         DEATHCHARGE|
|   4784|            1|4784|                RUNE|
|   4945|            1|4945|         SEA LEOPARD|
|   4602|            1|4602|         RED WOLF II|
|   6411|            1|6411|              ZANTOR|
|   3014|            1|3014|JOHNSON, LYNDON BAIN|
|   3298|            1|3298|          LUNATIK II|


# BFS search for degrees of separation between two characters:

In [9]:
START_CHARACTER_ID = 5306  # SpiderMan
TARGET_CHARACTER_ID = 14  # Some rando i've never heard of: https://marvel.fandom.com/wiki/Adam_3,031_(Earth-691)

class NodeState():
    UNVISTED = 0
    VISITED = 1
    EXPANDED = 2

hitCounter = sc.accumulator(0)

In [12]:
def prepare_for_mapreduce(line: str):
    fields = line.split()
    heroID = int(fields[0])
    hero_connections = []

    for hero_connection in fields[1:]:
        hero_connections.append(int(hero_connection))

    if heroID == START_CHARACTER_ID:
        state = NodeState.VISITED
        distance = 0
    else:
        state = NodeState.UNVISTED
        distance = 9999

    return (heroID, (hero_connections, distance, state))


def create_starting_rdd():
    input_file = sc.textFile('/content/drive/MyDrive/spark_tutorials/spark_datasets/MarvelGraph.txt')
    return input_file.map(prepare_for_mapreduce)


def bfs_map(node):
    character_id = node[0]
    character_data = node[1]
    character_connections = character_data[0]
    character_root_disance = character_data[1]
    character_node_state = character_data[2]

    results = []

    if character_node_state == NodeState.VISITED:
        for character_connection in character_connections:
            expanded_node_id = character_connection
            expanded_node_root_distance = character_root_disance+1
            expanded_node_state = NodeState.VISITED

            if expanded_node_id == TARGET_CHARACTER_ID:
                hitCounter.add(1)

            expanded_node_entry = (
                expanded_node_id, (
                    [],
                    expanded_node_root_distance,
                    expanded_node_state
                )
            )

            results.append(expanded_node_entry)

        character_node_state = NodeState.EXPANDED

    results.append((
        character_id, (
            character_connections,
            character_root_disance,
            character_node_state
        )
    ))

    return results


def bfs_reduce(node_data1, node_data2):
    edges1 = node_data1[0]
    edges2 = node_data2[0]
    distance1 = node_data1[1]
    distance2 = node_data2[1]
    state1 = node_data1[2]
    state2 = node_data2[2]

    edges = []

    edges.extend(edges1)
    edges.extend(edges2)

    distance = min(distance1, distance2)

    state = max(state1, state2)

    return (edges, distance, state)

In [13]:
rdd = create_starting_rdd()

# We expect that the maximum distance between nodes is at most 10
# Since it is a social graph: See max degrees of separation
for iteration in range(1, 10):
    print(f'Running BFS iteration #{iteration}')

    mapped = rdd.flatMap(bfs_map)

    # Since spark is lazy evaluated, this invokes evaluation
    # evaluation is necessary for the accumulator to be updated on time
    print(f'Processing {mapped.count()} nodes')

    if (hitCounter.value > 0):
        print(f'Hit target character from {hitCounter.value} directions')
        break

    rdd = mapped.reduceByKey(bfs_reduce)


Running BFS iteration #1
Processing 8330 nodes
Running BFS iteration #2
Processing 220615 nodes
Hit target character from 1 directions
