# Lab: DataFrame API and Spark SQL


## Social characteristics of the Marvel Universe

In this lab you be working with a dataset that was creted about the Marvel Comics universe. The source data is a text file that was created with a graph analysis library outside of Spark, so the text file contains a lot of information and is not in a format that is easy to query with SQL. You are going to use it in this lab to practice data ingestion, manipulation and analysis using both the DataFrame API and Spark SQL. You can see more about the source data [here](http://bioinfo.uib.es/~joemiro/marvel.html).

## Understanding the data file

As mentioned above, the data file contains information about Marvel characters, publications, and which character appeared in each publication. There are two sections in the file:

- Vertices (section begins in line 1 with a header in the form of `*Vertices 19428 6486`):
    - Characters: lines 2-6487 in the format `6481 "DETHSTRYK"`, where `6481` is the node id and the name is within double quotes
    - Publications: lines 6488-19429 in the same format as characters
- Edgeslist (section begins in line 19430 with a header in the form of `*Edgeslist`): lines 19431 to the end of the file. The edge information is in the following format: `2 6488 6489 6490 6491 6492 6493 6494 6495 6496`. This represents a relationship between the character id (the first number) and the publication id's (all other numbers.)


Find all your Spark related environment variables, and pyspark:

In [1]:
import findspark
findspark.init()

Create your SparkSession. You are only going to create a `SparkSession`, not a `SparkContext`.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("marvel").getOrCreate()

Make sure your SparkSession is active:

In [3]:
spark

## Read in the data.

Although the data is a flat file with the structure explained earlier, you will be working with this file using [Spark DataFrame API and Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Load in the text file using [Generic load functions for SparkSQL](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#data-sources), which is located at `s3://bigdatateaching/marvel/porgat.txt`. This file is also located in Azure Blob at `wasbs://marvel@bigdatateaching.blob.core.windows.net/porgat.txt`. 

Create a DataFrame called `df_in` with a single field, where each record is the full text per line in the original text file.


In [4]:
df_in = spark.read.text("s3://bigdatateaching/marvel/porgat.txt")

Look at the first few lines of `df_in`. What is the default field name?

In [5]:
df_in.show(10)

+--------------------+
|               value|
+--------------------+
|*Vertices 19428 6486|
|1 "24-HOUR MAN/EM...|
|2 "3-D MAN/CHARLE...|
|3 "4-D MAN/MERCURIO"|
|         4 "8-BALL/"|
|               5 "A"|
|           6 "A'YIN"|
|    7 "ABBOTT, JACK"|
|         8 "ABCISSA"|
|            9 "ABEL"|
+--------------------+
only showing top 10 rows



Count the number of rows in `df_in`:

In [6]:
df_in.count()

30520

What happens to a DataFrame when you bring it into your Python session from the cluster? Get the first few records of the `df_in` and save it into an object in your Python session:

In [7]:
df_in_10_python = df_in.take(10)

Explore the local Python object. How is it different than the DataFrame in the cluster? Read up on the [Pyspark Row object](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=row#pyspark.sql.Row)

In [8]:
df_in_10_python

[Row(value='*Vertices 19428 6486'),
 Row(value='1 "24-HOUR MAN/EMMANUEL"'),
 Row(value='2 "3-D MAN/CHARLES CHAN"'),
 Row(value='3 "4-D MAN/MERCURIO"'),
 Row(value='4 "8-BALL/"'),
 Row(value='5 "A"'),
 Row(value='6 "A\'YIN"'),
 Row(value='7 "ABBOTT, JACK"'),
 Row(value='8 "ABCISSA"'),
 Row(value='9 "ABEL"')]

In [9]:
type(df_in_10_python)

list

## Arrange the data

Since the data is in text format, at the end of this section you will have two DataFrames: one for nodes (both characters and publications) and one for edges (the relationship between characters and publications.)



The DataFrame API and SparkSQL functions are in the [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) library. You can import all of the functions or specific functions as needed.

Start by importing the `monotonically_increasing_id` function:

In [10]:
from pyspark.sql.functions import monotonically_increasing_id

Create a new dataframe called `df_in_idx` where you will add a new field called `idx` that uses the `monotonically_increasing_id` to add a unique incremental numeric index to each record. You should read more about the function and understand how it works. 

In [11]:
df_in_idx = df_in \
    .withColumn("idx",monotonically_increasing_id())

See your new dataframe, you will see the new column.

In [12]:
df_in_idx.show(10)

+--------------------+---+
|               value|idx|
+--------------------+---+
|*Vertices 19428 6486|  0|
|1 "24-HOUR MAN/EM...|  1|
|2 "3-D MAN/CHARLE...|  2|
|3 "4-D MAN/MERCURIO"|  3|
|         4 "8-BALL/"|  4|
|               5 "A"|  5|
|           6 "A'YIN"|  6|
|    7 "ABBOTT, JACK"|  7|
|         8 "ABCISSA"|  8|
|            9 "ABEL"|  9|
+--------------------+---+
only showing top 10 rows



In [13]:
df_in_idx.filter(df_in.value.rlike('^\*')).show()

+--------------------+-----+
|               value|  idx|
+--------------------+-----+
|*Vertices 19428 6486|    0|
|          *Edgeslist|19429|
+--------------------+-----+



We know from the data file that the two headers are in lines 1 and 19430, and we want those lines (records) from the data. Create a new dataframe called `df_idx_no_hdr` where using a sql query, you select all records but those with the header.

Before you can run a SparkSQL query, you need to "register" a dataframe. 

In [14]:
# remove the header rows
df_in_idx.createOrReplaceTempView("df_in_idx")
df_idx_no_hdr = spark.sql('select * from df_in_idx where idx !=0 and idx != 19429')

Check that the header rows were removed:

In [15]:
df_idx_no_hdr.count()

30518

Now you will create three separate dataframes: a `characters` one, a `publications` one, and a `relationships` one. You know the original line indices that partition the file, so use those. You have the `idx` field to use.

In [16]:
from pyspark.sql.functions import col
characters = df_idx_no_hdr.filter(col("idx") <= 6486)

In [17]:
characters.count()

6486

In [18]:
publications = df_idx_no_hdr \
    .filter((col("idx") >6486) & (col("idx") <= 19428))

In [19]:
publications.count()

12942

In [20]:
relationships = df_idx_no_hdr \
    .filter(df_idx_no_hdr["idx"] > 19428)

In [21]:
relationships.count()

11090

Register the characters and publications dataframes in order to run SQL commands:

In [22]:
# SELECT regexp_extract('100-200', '(\d+)-(\d+)', 1);
# 100
characters.createOrReplaceTempView("char")
publications.createOrReplaceTempView("pub")

Use the `regexp_extract` function within a SQL statement and a regular expression to create three fields from the whole line: the first field will be the integer (which is the node id), the second is the text **between** the double quotes, and the third wether it is a character or publication. Create two new separate datframes, one for characters and the other for publications.

In [23]:
char_df = spark.sql("select idx as node_id, regexp_extract(value, '^[0-9]+ \"(.*)\"$', 1) as name, \"character\" as node_type from char")

In [24]:
publication_df = spark.sql("select idx as node_id, regexp_extract(value, '^[0-9]+ \"(.*)\"$', 1) as name, \"publication\" as node_type from pub")

In [25]:
char_df.show(50)

+-------+--------------------+---------+
|node_id|                name|node_type|
+-------+--------------------+---------+
|      1|24-HOUR MAN/EMMANUEL|character|
|      2|3-D MAN/CHARLES CHAN|character|
|      3|    4-D MAN/MERCURIO|character|
|      4|             8-BALL/|character|
|      5|                   A|character|
|      6|               A'YIN|character|
|      7|        ABBOTT, JACK|character|
|      8|             ABCISSA|character|
|      9|                ABEL|character|
|     10|ABOMINATION/EMIL BLO|character|
|     11|ABOMINATION | MUTANT|character|
|     12|         ABOMINATRIX|character|
|     13|             ABRAXAS|character|
|     14|          ADAM 3,031|character|
|     15|             ABSALOM|character|
|     16|ABSORBING MAN/CARL C|character|
|     17|ABSORBING MAN | MUTA|character|
|     18|                ACBA|character|
|     19|ACHEBE, REVEREND DOC|character|
|     20|            ACHILLES|character|
|     21|  ACHILLES II/HELMUT|character|
|     22|  ACROB

In [26]:
publication_df.show(50)

+-------+--------+-----------+
|node_id|    name|  node_type|
+-------+--------+-----------+
|   6487|  AA2 35|publication|
|   6488|M/PRM 35|publication|
|   6489|M/PRM 36|publication|
|   6490|M/PRM 37|publication|
|   6491|   WI? 9|publication|
|   6492|   AVF 4|publication|
|   6493|   AVF 5|publication|
|   6494|  H2 251|publication|
|   6495|  H2 252|publication|
|   6496|   COC 1|publication|
|   6497|   T 208|publication|
|   6498|   T 214|publication|
|   6499|   T 215|publication|
|   6500|   T 216|publication|
|   6501|   T 440|publication|
|   6502|   CM 51|publication|
|   6503|    Q 14|publication|
|   6504|    Q 16|publication|
|   6505|  CA3 36|publication|
|   6506| SLEEP 2|publication|
|   6507| SLEEP 1|publication|
|   6508|SLEEP 19|publication|
|   6509|  W2 159|publication|
|   6510|  W2 160|publication|
|   6511|  W2 161|publication|
|   6512|XCAL 110|publication|
|   6513|XCAL 109|publication|
|   6514|XCAL 107|publication|
|   6515|XCAL 108|publication|
|   6516

Stack both the character and publications into a single dataframe called `nodes_df`, and cache it.

In [27]:
nodes_df = char_df.union(publication_df).cache()

In [28]:
nodes_df.filter(col("node_id") > 19410).show()

+-------+--------------------+-----------+
|node_id|                name|  node_type|
+-------+--------------------+-----------+
|  19411|              YC 4/4|publication|
|  19412|              YC 4/6|publication|
|  19413|            KZ3 11/2|publication|
|  19414|            KZ3 12/2|publication|
|  19415|               H' 98|publication|
|  19416|         SGT. FURY 8|publication|
|  19417|   CA: MEDUSA EFFECT|publication|
|  19418|SPIDER-MAN: FEAR ITS|publication|
|  19419|            TOTZ 4/6|publication|
|  19420|              TOTZ 5|publication|
|  19421|              TOTZ 6|publication|
|  19422|              TOTZ 7|publication|
|  19423|              TOTZ 8|publication|
|  19424|           BIZADV 33|publication|
|  19425|             WI 25/2|publication|
|  19426|              AA2 30|publication|
|  19427|              AA2 20|publication|
|  19428|              AA2 38|publication|
+-------+--------------------+-----------+



Now you will work on the relationships dataframe. Import the `split` and `explode` functions:

In [29]:
from pyspark.sql.functions import split, explode

In [None]:
rel_rdd = relationships.select("value").rdd

In [None]:
relationships.show()

In [None]:
r2 = relationships.withColumn("newcol", split(col("value"), " ")).cache() #convert value to array

Now, using a chain of daframe api methods, start with the value field in the `relationships` dataframe, wher you will create 

In [None]:
r2.show()

In [30]:
edges_df = relationships.withColumn("newcol", split(col("value"), " ")) \
    .withColumn("character", col("newcol").getItem(0)) \
    .select("character", explode(col("newcol")).alias("publication")) \
    .filter(col("character") != col("publication")).cache()

In [31]:
#  Register relationships as a temporary view
nodes_df.createOrReplaceTempView("nodes_df")
edges_df.createOrReplaceTempView("edges_df")

In [32]:
spark.sql("select count(*) from edges_df").collect()

[Row(count(1)=96662)]

## Create an analytical dataset

You will now create an analytical dataset using SparkSQL where you will join both tables (nodes_df and edge_df) so you have the data you need to run some analytics on the data.



In [33]:
edges_df.show(10)

+---------+-----------+
|character|publication|
+---------+-----------+
|        1|       6487|
|        2|       6488|
|        2|       6489|
|        2|       6490|
|        2|       6491|
|        2|       6492|
|        2|       6493|
|        2|       6494|
|        2|       6495|
|        2|       6496|
+---------+-----------+
only showing top 10 rows



In [34]:
nodes_df.show(10)

+-------+--------------------+---------+
|node_id|                name|node_type|
+-------+--------------------+---------+
|      1|24-HOUR MAN/EMMANUEL|character|
|      2|3-D MAN/CHARLES CHAN|character|
|      3|    4-D MAN/MERCURIO|character|
|      4|             8-BALL/|character|
|      5|                   A|character|
|      6|               A'YIN|character|
|      7|        ABBOTT, JACK|character|
|      8|             ABCISSA|character|
|      9|                ABEL|character|
|     10|ABOMINATION/EMIL BLO|character|
+-------+--------------------+---------+
only showing top 10 rows



In [35]:
analytics = spark.sql("""select e.character as character_id, c.name as character, 
                      e.publication as publication_id, p.name as publication
                      from edges_df e 
                      left join nodes_df c on e.character = c.node_id
                      left join nodes_df p on e.publication = p.node_id
                      """).cache()


In [36]:
analytics.show(10)

+------------+--------------------+--------------+-----------+
|character_id|           character|publication_id|publication|
+------------+--------------------+--------------+-----------+
|           1|24-HOUR MAN/EMMANUEL|          6487|     AA2 35|
|           2|3-D MAN/CHARLES CHAN|          6488|   M/PRM 35|
|           2|3-D MAN/CHARLES CHAN|          6489|   M/PRM 36|
|           2|3-D MAN/CHARLES CHAN|          6490|   M/PRM 37|
|           2|3-D MAN/CHARLES CHAN|          6491|      WI? 9|
|           2|3-D MAN/CHARLES CHAN|          6492|      AVF 4|
|           2|3-D MAN/CHARLES CHAN|          6493|      AVF 5|
|           2|3-D MAN/CHARLES CHAN|          6494|     H2 251|
|           2|3-D MAN/CHARLES CHAN|          6495|     H2 252|
|           2|3-D MAN/CHARLES CHAN|          6496|      COC 1|
+------------+--------------------+--------------+-----------+
only showing top 10 rows



What are the top 10 publications with the highest number of characters?

In [38]:
analytics.createOrReplaceTempView("analytics")
spark.sql("""select publication, count(*) as char_ct from analytics 
            group by publication order by char_ct desc limit 10""").show()

+--------------------+-------+
|         publication|char_ct|
+--------------------+-------+
|               COC 1|    111|
|MARVEL MYSTERY COMIC|     92|
|                IW 3|     91|
|                IW 1|     90|
|              H2 279|     87|
|                IW 4|     80|
|                IW 2|     76|
|            MAXSEC 3|     72|
|              FF 370|     63|
|              M/GN 1|     60|
+--------------------+-------+



In [None]:
sc.stop()

## Saving a DataFrame to a csv
```
df.write\
    .format("com.databricks.spark.csv")\
    .option("header", "true")\
    .save("path-in-hdfs-or-cloud")
```

This lab is based on [https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/](https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/). You can see the code in that blog post to perform the same operations using RDD commands.

## Parking lot

In [None]:
SQLContext()

Let's look at the file.

Count the number of records in the file.

In [None]:
df_in.createOrReplaceTempView('df_in_view')

In [None]:
spark.sql('select row_number() over (order by "some_column") as num, * from df_in_view').take(5)

In [None]:
df_in.rdd.getNumPartitions()

In [None]:
df_python = spark.sql("select count(*) from relationships").collect()

In [None]:
df_python

In [None]:
relationshipsDf.count()

In [None]:
spark.sql("select * from relationships limit 10").show()

In [None]:
r2 = spark.read.csv("s3://bigdatateaching/marvel/relationship",header=True)

In [None]:
r2.show()

In [None]:
#  Let's do the same for characters
charactersDf = spark.createDataFrame(characters.map(lambda t:  Row(charId=t[0], name=t[1])))
charactersDf.createOrReplaceTempView("characters")


In [None]:
#  and for publications
publicationsDf = spark.createDataFrame(publications.map(lambda t:  Row(pubId=t[0], name=t[1])))
publicationsDf.createOrReplaceTempView("publications")

In [None]:
relationshipsDf.show(10)

The following cell is the standard way of running a SQL query on Spark. This query ranks Marvel characters in duo in order of join-appearances in publications. 

In [None]:
df1 = spark.sql("""SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  WHERE r1.charId < r2.charId
  AND r1.pubId=r2.pubId
  GROUP BY r1.charId, r2.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
ORDER BY sub.pubCount DESC
LIMIT 10""").cache()


In [None]:
df1.take(10)

In [None]:
df2 = spark.sql("""
SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  CROSS JOIN relationships AS r3
  WHERE r1.charId < r2.charId
  AND r2.charId < r3.charId
  AND r1.pubId=r2.pubId
  AND r2.pubId=r3.pubId
  GROUP BY r1.charId, r2.charId, r3.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
INNER JOIN characters c3 ON c3.charId=sub.charId3
ORDER BY sub.pubCount DESC
LIMIT 10
""").cache()


In [None]:
df2.show(10)