# Big Data and Graph Data

In this module, we'll take what we learned about indices and generalize!

Apache Spark is a big data engine that runs on compute clusters, including on the cloud.  This notebook is set up assuming that (1) Spark is running on an AWS server that is public [this may **not** be true at the time you look at this!] and (2) we need to run the actual Python commands on that server, requiring us to put `%%spark` "magic" commands at the start of each cell.

You may need to look at this notebook without directly running it, until we give you specific instructions on launching your own Spark cluster.


In [None]:
!apt install libkrb5-dev
!pip install sparkmagic

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-430
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  comerr-dev krb5-multidev libcom-err2 libgssrpc4 libkadm5clnt-mit11
  libkadm5srv-mit11 libkdb5-9
Suggested packages:
  doc-base krb5-doc krb5-user
The following NEW packages will be installed:
  comerr-dev krb5-multidev libgssrpc4 libkadm5clnt-mit11 libkadm5srv-mit11
  libkdb5-9 libkrb5-dev
The following packages will be upgraded:
  libcom-err2
1 upgraded, 7 newly installed, 0 to remove and 24 not upgraded.
Need to get 358 kB of archives.
After this operation, 1,992 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libcom-err2 amd64 1.44.1-1ubuntu1.3 [8,848 B]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libgssrpc4 amd64 1.16-2ub

In [None]:
%load_ext sparkmagic.magics

The following line connects to Spark running remotely (note you'll need to start an Amazon AWS Elastic MapReduce instance)
.  You will likely need to change the URL after the `-u` to connect to an active server.

In [None]:
%spark add -s my_session -l python -u http://ec2-3-91-74-45.compute-1.amazonaws.com -a cis545-livy -p password1 -t Basic_Access
# The above can connect to an EMR node running Spark + Livy, assuming the firewall is set to let anyone in

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1581350900124_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [None]:
# Uncomment this and run if you need to restart the session
#%spark delete -s my_session

In [None]:
%%spark

import sys
print("Python version")
print (sys.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Python version
3.6.8 (default, Oct 14 2019, 21:22:53) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)]

## Example of Loading Sharded Data

First let's do our preliminaries.  **Every** cell in this notebook will need `%%spark` at the start so it runs on the remote machine with Spark on it, instead of on the machine with Jupyter.

In [None]:
%%spark
import json
import requests


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# 10K records from linkedin
linked_in = requests.get('https://www.cis.upenn.edu/~cis545/xaa')

my_list = [json.loads(line) for line in linked_in.iter_lines()]
len(my_list)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10000

## Load the list into Spark

Spark needs to know the structure of the data in its dataframes, i.e., their schemas.  Since our JSON structure for LinkedIn is complex, we need to define the schema.

There are some basic types:
  * The table is a `StructType` with a list of fields (each row)
  * Most fields, in our case, are `StringType`.
  * We also have nested dictionary for the name, which is a `MapType` from `StringType` keys to `StringType` values.
  * `skills` is an `ArrayType` since it's a list, and it contains `StringType`s.
  * `also_view` is an array of structs.

See Pyspark documentation on `StructType` and examples such as https://www.programcreek.com/python/example/104715/pyspark.sql.types.StructType.

In [None]:
%%spark

# Spark requires that we define a schema for the LinkedIn data...
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, MapType
schema = StructType([
        StructField("_id", StringType(), True),
        StructField("name", MapType(StringType(), StringType()), True),
        StructField("locality", StringType(), True),
        StructField("skills", ArrayType(StringType()), True),
        StructField("industry", StringType(), True),
        StructField("summary", StringType(), True),
        StructField("url", StringType(), True),
        StructField("also_view", ArrayType(\
                    StructType([\
                      StructField("url", StringType(), True),\
                      StructField("id", StringType(), True)])\
                    ), True)\
         ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
# Load the remote data as a list of dictionaries
linked_df = sqlContext.createDataFrame(my_list, schema).\
  repartition('_id')

linked_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            _id|                name|            locality|              skills|            industry|             summary|                 url|           also_view|
+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     in-2432006|[given_name -> La...|   Genoa Area, Italy|[HR Consulting, E...|       Risorse umane|Ottobre 2012 - Ce...|http://it.linkedi...|[[http://it.linke...|
|      in-261076|[given_name -> Re...|London, United Ki...|[Information Secu...|  Financial Services|                null|http://uk.linkedi...|[[http://ch.linke...|
|in-4mikesandahl|[given_name -> Mi...| Greater Boston Area|[Lean Manufacturi...|              Design|                null|http://www.linked...|[[http://www.link...|
|       in

In [None]:
%%spark
linked_df.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

200

In [None]:
%%spark
linked_df.filter(linked_df.locality == 'United States')[['_id', 'name', 'locality']].show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+-------------+
|             _id|                name|     locality|
+----------------+--------------------+-------------+
|   in-aaronhrose|[given_name -> Aa...|United States|
|    in-akalderon|[given_name -> Av...|United States|
|      in-1solone|[given_name -> Ha...|United States|
|in-actiongarment|[given_name -> Da...|United States|
|   in-abhinethra|[given_name -> Ab...|United States|
+----------------+--------------------+-------------+
only showing top 5 rows

In [None]:
%%spark
linked_df.select("_id", "locality").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+
|            _id|            locality|
+---------------+--------------------+
|     in-2432006|   Genoa Area, Italy|
|      in-261076|London, United Ki...|
|in-4mikesandahl| Greater Boston Area|
|       in-55432|Salamanca y alred...|
|      in-aaeran|      United Kingdom|
+---------------+--------------------+
only showing top 5 rows

In [None]:
%%spark
### Clean out the list from memory
my_list = []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
linked_df.createOrReplaceTempView('linked_in')
sqlContext.sql('select * from linked_in').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            _id|                name|            locality|              skills|            industry|             summary|                 url|           also_view|
+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     in-2432006|[given_name -> La...|   Genoa Area, Italy|[HR Consulting, E...|       Risorse umane|Ottobre 2012 - Ce...|http://it.linkedi...|[[http://it.linke...|
|      in-261076|[given_name -> Re...|London, United Ki...|[Information Secu...|  Financial Services|                null|http://uk.linkedi...|[[http://ch.linke...|
|in-4mikesandahl|[given_name -> Mi...| Greater Boston Area|[Lean Manufacturi...|              Design|                null|http://www.linked...|[[http://www.link...|
|       in

In [None]:
%%spark
sqlContext.sql('select _id, name.given_name, name.first_name from linked_in').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------+----------+
|            _id|given_name|first_name|
+---------------+----------+----------+
|     in-2432006|     Laura|      null|
|      in-261076|     Renan|      null|
|in-4mikesandahl|      Mike|      null|
|       in-55432|     Marta|      null|
|      in-aaeran|     Ankur|      null|
+---------------+----------+----------+
only showing top 5 rows

In [None]:
%%spark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

acro = udf(lambda x: ''.join([n[0] for n in x.split()]), StringType())

linked_df.select("_id", acro("locality").alias("acronym")).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------+
|            _id|acronym|
+---------------+-------+
|     in-2432006|    GAI|
|      in-261076|    LUK|
|in-4mikesandahl|    GBA|
|       in-55432|   SyaE|
|      in-aaeran|     UK|
+---------------+-------+
only showing top 5 rows

In [None]:
%%spark
# Which industries are most popular?
sqlContext.sql('select count(_id), industry '+\
               'from linked_in '+\
               'group by industry '+\
               'order by count(_id) desc').\
    show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|count(_id)|            industry|
+----------+--------------------+
|      1198|Information Techn...|
|       781|   Computer Software|
|       482|Marketing and Adv...|
|       397|            Internet|
|       323|  Financial Services|
+----------+--------------------+
only showing top 5 rows

## Graphs

For the next set of examples, we will look at graph-structured data.  It turns out our LinkedIn dataset has a list of nodes (by int ID, but associated with the user ID we used in the linked_in table) and a list of edges.

In [None]:
%%spark

import urllib.request
import zipfile
import os

url = 'https://upenn-bigdataanalytics.s3.amazonaws.com/linkedin.edges.zip'
filehandle, _ = urllib.request.urlretrieve(url)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

zip_file_object = zipfile.ZipFile(filehandle, 'r')

infolist = zip_file_object.infolist()

print (infolist)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[<ZipInfo filename='linkedin.edges' compress_type=deflate file_size=293652034 compress_size=104566020>]

In [None]:
%%spark

edges = []
MAX = 100000

with zip_file_object.open('linkedin.edges','r') as fname:
  for link in fname:
    str_val = link.decode('utf-8')
    edge = str_val.split(' ')
    edges.append([int(edge[0]), int(edge[1])])
    if len(edges) >= MAX:
      break


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

from pyspark.sql.types import IntegerType
schema = StructType([
        StructField("from", IntegerType(), True),
        StructField("to", IntegerType(), True)
         ])
# Load the remote data as a list of dictionaries
edges_df = sqlContext.createDataFrame(edges, schema)

edges_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+
|from|     to|
+----+-------+
|   0|2152448|
|   0|1656491|
|   0| 399364|
|   0|  18448|
|   0|  72025|
+----+-------+
only showing top 5 rows

In [None]:
%%spark
edges_df.createOrReplaceTempView('edges')
sqlContext.sql('select from as id, count(to) as degree from edges group by from').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+
| id|degree|
+---+------+
|148|   140|
|463|    93|
|471|    88|
|496|    88|
|833|    76|
+---+------+
only showing top 5 rows

## Traversing the Graph

In [None]:
%%spark

from pyspark.sql.functions import col

# Start with a subset of nodes
start_nodes_df = edges_df[['from']].filter(edges_df['from'] < 1000).\
  select(col('from').alias('id')).drop_duplicates()

start_nodes_df.show(5)

# The neighbors require us to join
# and we'll use Spark DataFrames syntax here
neighbor_nodes_df = start_nodes_df.\
  join(edges_df, start_nodes_df.id == edges_df['from']).\
  select(col('to').alias('id'))

neighbor_nodes_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+
| id|
+---+
|148|
|463|
|471|
|496|
|833|
+---+
only showing top 5 rows

+-------+
|     id|
+-------+
|1510404|
|    523|
| 993804|
| 469009|
| 232979|
+-------+
only showing top 5 rows

In [None]:
%%spark
edges_df[['from']].orderBy('from').drop_duplicates().show()

edges_df.filter(edges_df['from'] == 1).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+
|from|
+----+
|   0|
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
+----+
only showing top 20 rows

+----+-------+
|from|     to|
+----+-------+
|   1|  77832|
|   1| 542731|
|   1| 317452|
|   1|  27650|
|   1|2662416|
|   1| 104468|
|   1| 176149|
|   1|     25|
|   1|  53282|
|   1| 516132|
|   1|  47142|
|   1| 104488|
|   1| 262186|
|   1|1392685|
|   1| 523471|
|   1| 110639|
|   1| 700465|
|   1|1941562|
|   1| 116809|
|   1|1837130|
+----+-------+
only showing top 20 rows

In [None]:
%%spark
neighbor_neighbor_nodes_df = neighbor_nodes_df.\
  join(edges_df, neighbor_nodes_df.id == edges_df['from']).\
  select(col('to').alias('id'))

neighbor_neighbor_nodes_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+
|     id|
+-------+
| 445099|
| 435723|
|1666062|
| 390673|
|2328084|
+-------+
only showing top 5 rows

In [None]:
%%spark
def iterate(df, depth):
  df.createOrReplaceTempView('iter')

  # Base case: direct connection
  result = sqlContext.sql('select from, to, 1 as depth from iter')

  for i in range(1, depth):
    result.createOrReplaceTempView('result')
    result = sqlContext.sql('select r1.from as from, r2.to as to, r1.depth+1 as depth  '\
                            'from result r1 join iter r2 '\
                            'on r1.to=r2.from')
  return result

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
iterate(edges_df.filter(edges_df['from'] < 1000), 1).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----+-----+
|from|  to|depth|
+----+----+-----+
|   0|  38|    1|
|   0| 101|    1|
|   0| 121|    1|
|   0| 161|    1|
|   0| 337|    1|
|   0| 487|    1|
|   0| 504|    1|
|   0| 802|    1|
|   0|1379|    1|
|   0|1583|    1|
|   0|1961|    1|
|   0|1982|    1|
|   0|1996|    1|
|   0|2250|    1|
|   0|2409|    1|
|   0|2692|    1|
|   0|3179|    1|
|   0|3250|    1|
|   0|3787|    1|
|   0|4213|    1|
+----+----+-----+
only showing top 20 rows

In [None]:
%%spark
iterate(edges_df.filter(edges_df['from'] < 1000), 2).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---+-----+
|from| to|depth|
+----+---+-----+
|   0| 59|    2|
|   0| 66|    2|
|   0|101|    2|
|   0|121|    2|
|   0|121|    2|
|   0|161|    2|
|   0|236|    2|
|   0|236|    2|
|   0|236|    2|
|   0|337|    2|
|   0|337|    2|
|   0|337|    2|
|   0|487|    2|
|   0|487|    2|
|   0|487|    2|
|   0|487|    2|
|   0|504|    2|
|   0|504|    2|
|   0|504|    2|
|   0|504|    2|
+----+---+-----+
only showing top 20 rows

In [None]:
%%spark
iterate(edges_df.filter(edges_df['from'] < 1000), 3).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---+-----+
|from| to|depth|
+----+---+-----+
|   0|101|    3|
|   0|101|    3|
|   0|121|    3|
|   0|121|    3|
|   0|121|    3|
|   0|236|    3|
|   0|236|    3|
|   0|236|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|337|    3|
|   0|487|    3|
|   0|487|    3|
|   0|487|    3|
|   0|487|    3|
+----+---+-----+
only showing top 20 rows

In [None]:
%%spark
# Clear list of edges from Python memory
# to free up space
edges = []


### Now let's get the list of node IDs
url = 'https://upenn-bigdataanalytics.s3.amazonaws.com/linkedin.nodes.zip'
nodehandle, _ = urllib.request.urlretrieve(url)

zip_file_object = zipfile.ZipFile(nodehandle, 'r')
fname = zip_file_object.open('linkedin.nodes')

nodes = []
MAX = 100000

for node in fname:
  node_tuple = node.split()
  nodes.append([int(node_tuple[0]), str(node_tuple[1])])
  if len(nodes) >= MAX:
    break


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Joins in Spark, Beyond Graph Traversals

What if we want to connect our edges to the people from our previous crawl?  Sadly the edges use int node IDs that don't correspond to the people dataframe.  But in fact the node data includes this information, so let's load and exploit that.

Let's load the information about nodes, and their correspondence to the user ID.

In [None]:
%%spark
schema = StructType([
        StructField("nid", IntegerType(), True),
        StructField("user", StringType(), True)
         ])
# Load the remote data as a list of dictionaries
nodes_df = sqlContext.createDataFrame(nodes, schema)

nodes_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+
|nid|                user|
+---+--------------------+
|  0|b'pub-sandra-aran...|
|  1|  b'in-sehrishhafiz'|
|  2|b'pub-heba-bayoum...|
|  3|b'pub-aysha-binbr...|
|  4|   b'in-rubabadowla'|
+---+--------------------+
only showing top 5 rows

## Finding Friends, by ID

In [None]:
%%spark

nodes_df.createOrReplaceTempView('nodes')
edges_df.createOrReplaceTempView('edges')

friends_df = \
sqlContext.sql('select n1.user, n2.user as friend ' +\
               'from (nodes n1 join edges e on n1.nid = e.from) join nodes n2 on e.to = n2.nid')

friends_df.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+
|                user|              friend|
+--------------------+--------------------+
|b'pub-sara-mu\xc3...|b'pub-rebeca-arro...|
|b'pub-gloria-vill...|b'pub-rebeca-arro...|
|b'in-dianacolliab...|b'pub-rebeca-arro...|
|     b'in-maitepena'|b'pub-rebeca-arro...|
|      b'in-alvarovl'|b'pub-bego\xc3\x8...|
+--------------------+--------------------+
only showing top 5 rows

## Connecting Friends to Names

In [None]:
%%spark
friends_df.createOrReplaceTempView('friends')

sqlContext.sql('select u1.name.given_name as user, u2.name.given_name as friend '+\
               'from (linked_in u1 join friends on u1._id = user) join linked_in u2 on u2._id = friend').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------+
|user|friend|
+----+------+
+----+------+