# Lab: DataFrame API and Spark SQL

There is an [accompanying reference notebook](reference/reference-dataframes.ipynb) that shows many DataFrame and SparkSQL examples from the book [Learning Pyspark by Denny Lee and Thomas Drabasz](https://learning.oreilly.com/library/view/learning-pyspark/9781786463708/)


## Social characteristics of the Marvel Universe

In this lab you be working with a dataset that was creted about the Marvel Comics universe. The source data is a text file that was created with a graph analysis library outside of Spark, so the text file contains a lot of information and is not in a format that is easy to query with SQL. You are going to use it in this lab to practice data ingestion, manipulation and analysis using both the DataFrame API and Spark SQL. You can see more about the source data [here](http://bioinfo.uib.es/~joemiro/marvel.html).

## Understanding the data file

As mentioned above, the data file contains information about Marvel characters, publications, and which character appeared in each publication. There are two sections in the file:

- Vertices (section begins in line 1 with a header in the form of `*Vertices 19428 6486`):
    - Characters: lines 2-6487 in the format `6481 "DETHSTRYK"`, where `6481` is the node id and the name is within double quotes
    - Publications: lines 6488-19429 in the same format as characters
- Edgeslist (section begins in line 19430 with a header in the form of `*Edgeslist`): lines 19431 to the end of the file. The edge information is in the following format: `2 6488 6489 6490 6491 6492 6493 6494 6495 6496`. This represents a relationship between the character id (the first number) and the publication id's (all other numbers.)


Find all your Spark related environment variables, and pyspark:

In [3]:
import findspark
findspark.init()

Create your SparkSession. You are only going to create a `SparkSession`, not a `SparkContext`.

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
     .appName("Test SparkSession") \
     .getOrCreate()

Make sure your SparkSession is active:

In [5]:
spark

## Read in the data.

Although the data is a flat file with the structure explained earlier, you will be working with this file using [Spark DataFrame API and Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Load in the text file using [Generic load functions for SparkSQL](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#data-sources), which is located at `s3://bigdatateaching/marvel/porgat.txt`. This file is also located in Azure Blob at `wasbs://marvel@bigdatateaching.blob.core.windows.net/porgat.txt`. 

Create a DataFrame called `df_in` with a single field, where each record is the full text per line in the original text file.


In [6]:
df_in = spark.read.text("s3://bigdatateaching/marvel/porgat.txt")

Look at the first few lines of `df_in`. What is the default field name?

In [7]:
df_in.show()

+--------------------+
|               value|
+--------------------+
|*Vertices 19428 6486|
|1 "24-HOUR MAN/EM...|
|2 "3-D MAN/CHARLE...|
|3 "4-D MAN/MERCURIO"|
|         4 "8-BALL/"|
|               5 "A"|
|           6 "A'YIN"|
|    7 "ABBOTT, JACK"|
|         8 "ABCISSA"|
|            9 "ABEL"|
|10 "ABOMINATION/E...|
|11 "ABOMINATION |...|
|    12 "ABOMINATRIX"|
|        13 "ABRAXAS"|
|     14 "ADAM 3,031"|
|        15 "ABSALOM"|
|16 "ABSORBING MAN...|
|17 "ABSORBING MAN...|
|           18 "ACBA"|
|19 "ACHEBE, REVER...|
+--------------------+
only showing top 20 rows



Count the number of rows in `df_in`:

In [8]:
df_in.count()

30520

What happens to a DataFrame when you bring it into your Python session from the cluster? Get the first few records of the `df_in` and save it into an object in your Python session:

In [9]:
df_in_10_python = df_in.take(10)

Explore the local Python object. How is it different than the DataFrame in the cluster? Read up on the [Pyspark Row object](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=row#pyspark.sql.Row)

In [10]:
df_in_10_python

[Row(value='*Vertices 19428 6486'),
 Row(value='1 "24-HOUR MAN/EMMANUEL"'),
 Row(value='2 "3-D MAN/CHARLES CHAN"'),
 Row(value='3 "4-D MAN/MERCURIO"'),
 Row(value='4 "8-BALL/"'),
 Row(value='5 "A"'),
 Row(value='6 "A\'YIN"'),
 Row(value='7 "ABBOTT, JACK"'),
 Row(value='8 "ABCISSA"'),
 Row(value='9 "ABEL"')]

In [11]:
type(df_in_10_python)

list

In [12]:
import pandas as pd

## Arrange the data

Since the data is in text format, at the end of this section you will have two DataFrames: one for nodes (both characters and publications) and one for edges (the relationship between characters and publications.)



The DataFrame API and SparkSQL functions are in the [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) library. You can import all of the functions or specific functions as needed.

Start by importing the `monotonically_increasing_id` function:

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

Create a new dataframe called `df_in_idx` where you will add a new field called `idx` that uses the `monotonically_increasing_id` to add a unique incremental numeric index to each record. You should read more about the function and understand how it works. 

In [14]:
df_in_idx = df_in.withColumn("idx",monotonically_increasing_id())

See your new dataframe, you will see the new column.

In [15]:
df_in_idx.show(10)

+--------------------+---+
|               value|idx|
+--------------------+---+
|*Vertices 19428 6486|  0|
|1 "24-HOUR MAN/EM...|  1|
|2 "3-D MAN/CHARLE...|  2|
|3 "4-D MAN/MERCURIO"|  3|
|         4 "8-BALL/"|  4|
|               5 "A"|  5|
|           6 "A'YIN"|  6|
|    7 "ABBOTT, JACK"|  7|
|         8 "ABCISSA"|  8|
|            9 "ABEL"|  9|
+--------------------+---+
only showing top 10 rows



We know from the data file that the two headers are in lines 1 and 19430, and we want those lines (records) from the data. Create a new dataframe called `df_idx_no_hdr` where using a sql query, you select all records but those with the header.

Before you can run a SparkSQL query, you need to "register" a dataframe. 

In [16]:
df_in_idx.createOrReplaceTempView("df_in_idx")

In [17]:
df_in_idx.filter(df_in.value.rlike('^\*')).show()

+--------------------+-----+
|               value|  idx|
+--------------------+-----+
|*Vertices 19428 6486|    0|
|          *Edgeslist|19429|
+--------------------+-----+



In [18]:
df_idx_no_hdr = spark.sql("select * from df_in_idx where idx != 0 and idx!= 19429")

Check that the header rows were removed:

In [19]:
df_idx_no_hdr.count()

30518

In [20]:
df_idx_no_hdr.createOrReplaceTempView("df_idx_no_hdr")

Now you will create three separate dataframes: a `characters` one, a `publications` one, and a `relationships` one. You know the original line indices that partition the file, so use those. You have the `idx` field to use.

In [21]:
characters = spark.sql("select * from df_idx_no_hdr where idx between 1 and 6486")

In [22]:
publications = spark.sql("select * from df_idx_no_hdr where idx between 6487 and 19428")

In [23]:
relationships = spark.sql("select * from df_idx_no_hdr where idx >19428")

In [24]:
characters.count()

6486

In [25]:
characters.show()

+--------------------+---+
|               value|idx|
+--------------------+---+
|1 "24-HOUR MAN/EM...|  1|
|2 "3-D MAN/CHARLE...|  2|
|3 "4-D MAN/MERCURIO"|  3|
|         4 "8-BALL/"|  4|
|               5 "A"|  5|
|           6 "A'YIN"|  6|
|    7 "ABBOTT, JACK"|  7|
|         8 "ABCISSA"|  8|
|            9 "ABEL"|  9|
|10 "ABOMINATION/E...| 10|
|11 "ABOMINATION |...| 11|
|    12 "ABOMINATRIX"| 12|
|        13 "ABRAXAS"| 13|
|     14 "ADAM 3,031"| 14|
|        15 "ABSALOM"| 15|
|16 "ABSORBING MAN...| 16|
|17 "ABSORBING MAN...| 17|
|           18 "ACBA"| 18|
|19 "ACHEBE, REVER...| 19|
|       20 "ACHILLES"| 20|
+--------------------+---+
only showing top 20 rows



In [26]:
publications.count()

12942

In [27]:
publications.show(truncate=False)

+---------------+----+
|value          |idx |
+---------------+----+
|6487 "AA2 35"  |6487|
|6488 "M/PRM 35"|6488|
|6489 "M/PRM 36"|6489|
|6490 "M/PRM 37"|6490|
|6491 "WI? 9"   |6491|
|6492 "AVF 4"   |6492|
|6493 "AVF 5"   |6493|
|6494 "H2 251"  |6494|
|6495 "H2 252"  |6495|
|6496 "COC 1"   |6496|
|6497 "T 208"   |6497|
|6498 "T 214"   |6498|
|6499 "T 215"   |6499|
|6500 "T 216"   |6500|
|6501 "T 440"   |6501|
|6502 "CM 51"   |6502|
|6503 "Q 14"    |6503|
|6504 "Q 16"    |6504|
|6505 "CA3 36"  |6505|
|6506 "SLEEP 2" |6506|
+---------------+----+
only showing top 20 rows



In [28]:
relationships.count()

11090

In [29]:
relationships.show()

+--------------------+-----+
|               value|  idx|
+--------------------+-----+
|              1 6487|19430|
|2 6488 6489 6490 ...|19431|
|3 6497 6498 6499 ...|19432|
|    4 6506 6507 6508|19433|
|    5 6509 6510 6511|19434|
|6 6512 6513 6514 ...|19435|
|              7 6516|19436|
|         8 6517 6518|19437|
|         9 6519 6520|19438|
|10 6521 6522 6523...|19439|
|10 6536 6537 6538...|19440|
|10 6551 6552 6553...|19441|
|             11 6566|19442|
|   12 6567 6568 6569|19443|
|13 6570 6571 6572...|19444|
|14 6574 6575 6576...|19445|
|15 6578 6579 6580...|19446|
|16 6582 6583 6584...|19447|
|16 6597 6598 6599...|19448|
|16 6612 6613 6614...|19449|
+--------------------+-----+
only showing top 20 rows



Register the characters and publications dataframes in order to run SQL commands:

In [30]:
characters.createOrReplaceTempView("characters")
publications.createOrReplaceTempView("publications")

Use the `regexp_extract` function within a SQL statement and a regular expression to create three fields from the whole line: the first field will be the integer (which is the node id), the second is the text **between** the double quotes, and the third wether it is a character or publication. Create two new separate datframes, one for characters and the other for publications.

In [31]:
from pyspark.sql.functions import regexp_extract, col

In [32]:
df_char = spark.sql("""select REGEXP_EXTRACT(value,''' "(.*)"''', 1) as name ,\
                  regexp_extract(value, '[0-9]+',0) as node_id,\
                   IF(idx <6487, "Character", "Publication") as Type\
          from characters""")

In [33]:
df_pub = spark.sql("""select REGEXP_EXTRACT(value,''' "(.*)"''', 1) as name ,\
                  regexp_extract(value, '[0-9]+',0) as node_id,\
                   IF(idx <6487, "Character", "Publication") as Type\
          from publications""")

In [34]:
df_char.show(truncate=False)

+--------------------+-------+---------+
|name                |node_id|Type     |
+--------------------+-------+---------+
|24-HOUR MAN/EMMANUEL|1      |Character|
|3-D MAN/CHARLES CHAN|2      |Character|
|4-D MAN/MERCURIO    |3      |Character|
|8-BALL/             |4      |Character|
|A                   |5      |Character|
|A'YIN               |6      |Character|
|ABBOTT, JACK        |7      |Character|
|ABCISSA             |8      |Character|
|ABEL                |9      |Character|
|ABOMINATION/EMIL BLO|10     |Character|
|ABOMINATION | MUTANT|11     |Character|
|ABOMINATRIX         |12     |Character|
|ABRAXAS             |13     |Character|
|ADAM 3,031          |14     |Character|
|ABSALOM             |15     |Character|
|ABSORBING MAN/CARL C|16     |Character|
|ABSORBING MAN | MUTA|17     |Character|
|ACBA                |18     |Character|
|ACHEBE, REVEREND DOC|19     |Character|
|ACHILLES            |20     |Character|
+--------------------+-------+---------+
only showing top

In [35]:
df_pub.show(truncate=False)

+--------+-------+-----------+
|name    |node_id|Type       |
+--------+-------+-----------+
|AA2 35  |6487   |Publication|
|M/PRM 35|6488   |Publication|
|M/PRM 36|6489   |Publication|
|M/PRM 37|6490   |Publication|
|WI? 9   |6491   |Publication|
|AVF 4   |6492   |Publication|
|AVF 5   |6493   |Publication|
|H2 251  |6494   |Publication|
|H2 252  |6495   |Publication|
|COC 1   |6496   |Publication|
|T 208   |6497   |Publication|
|T 214   |6498   |Publication|
|T 215   |6499   |Publication|
|T 216   |6500   |Publication|
|T 440   |6501   |Publication|
|CM 51   |6502   |Publication|
|Q 14    |6503   |Publication|
|Q 16    |6504   |Publication|
|CA3 36  |6505   |Publication|
|SLEEP 2 |6506   |Publication|
+--------+-------+-----------+
only showing top 20 rows



Stack both the character and publications into a single dataframe called `nodes_df`, and cache it.

In [36]:
nodes_df = df_char.union(df_pub)

In [37]:
nodes_df.show()

+--------------------+-------+---------+
|                name|node_id|     Type|
+--------------------+-------+---------+
|24-HOUR MAN/EMMANUEL|      1|Character|
|3-D MAN/CHARLES CHAN|      2|Character|
|    4-D MAN/MERCURIO|      3|Character|
|             8-BALL/|      4|Character|
|                   A|      5|Character|
|               A'YIN|      6|Character|
|        ABBOTT, JACK|      7|Character|
|             ABCISSA|      8|Character|
|                ABEL|      9|Character|
|ABOMINATION/EMIL BLO|     10|Character|
|ABOMINATION | MUTANT|     11|Character|
|         ABOMINATRIX|     12|Character|
|             ABRAXAS|     13|Character|
|          ADAM 3,031|     14|Character|
|             ABSALOM|     15|Character|
|ABSORBING MAN/CARL C|     16|Character|
|ABSORBING MAN | MUTA|     17|Character|
|                ACBA|     18|Character|
|ACHEBE, REVEREND DOC|     19|Character|
|            ACHILLES|     20|Character|
+--------------------+-------+---------+
only showing top

Now you will work on the relationships dataframe. Import the `split` and `explode` functions:

In [38]:
from pyspark.sql.functions import split, explode

In [39]:
relationships.createOrReplaceTempView("relationships")

In [40]:
r2 = relationships.withColumn("newcol", split(col("value"), " ")).cache() #convert value to array

In [41]:
r2.show()

+--------------------+-----+--------------------+
|               value|  idx|              newcol|
+--------------------+-----+--------------------+
|              1 6487|19430|           [1, 6487]|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|
|3 6497 6498 6499 ...|19432|[3, 6497, 6498, 6...|
|    4 6506 6507 6508|19433|[4, 6506, 6507, 6...|
|    5 6509 6510 6511|19434|[5, 6509, 6510, 6...|
|6 6512 6513 6514 ...|19435|[6, 6512, 6513, 6...|
|              7 6516|19436|           [7, 6516]|
|         8 6517 6518|19437|     [8, 6517, 6518]|
|         9 6519 6520|19438|     [9, 6519, 6520]|
|10 6521 6522 6523...|19439|[10, 6521, 6522, ...|
|10 6536 6537 6538...|19440|[10, 6536, 6537, ...|
|10 6551 6552 6553...|19441|[10, 6551, 6552, ...|
|             11 6566|19442|          [11, 6566]|
|   12 6567 6568 6569|19443|[12, 6567, 6568, ...|
|13 6570 6571 6572...|19444|[13, 6570, 6571, ...|
|14 6574 6575 6576...|19445|[14, 6574, 6575, ...|
|15 6578 6579 6580...|19446|[15, 6578, 6579, ...|


Now, using a chain of daframe api methods, start with the value field in the `relationships` dataframe, wherw you will create a dataframe that takes the first value of the edge and puts it in the character field, and then "explodes" the other values in the publication column (1 row per character-publication combination.)

In [42]:
## Extract first value(character), and put these in a column called "Character"
r21 = r2.withColumn("character",r2.newcol[0])
r21.createOrReplaceTempView("r21")
r21.show()

+--------------------+-----+--------------------+---------+
|               value|  idx|              newcol|character|
+--------------------+-----+--------------------+---------+
|              1 6487|19430|           [1, 6487]|        1|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|
|3 6497 6498 6499 ...|19432|[3, 6497, 6498, 6...|        3|
|    4 6506 6507 6508|19433|[4, 6506, 6507, 6...|        4|
|    5 6509 6510 6511|19434|[5, 6509, 6510, 6...|        5|
|6 6512 6513 6514 ...|19435|[6, 6512, 6513, 6...|        6|
|              7 6516|19436|           [7, 6516]|        7|
|         8 6517 6518|19437|     [8, 6517, 6518]|        8|
|         9 6519 6520|19438|     [9, 6519, 6520]|        9|
|10 6521 6522 6523...|19439|[10, 6521, 6522, ...|       10|
|10 6536 6537 6538...|19440|[10, 6536, 6537, ...|       10|
|10 6551 6552 6553...|19441|[10, 6551, 6552, ...|       10|
|             11 6566|19442|          [11, 6566]|       11|
|   12 6567 6568 6569|19443|[12, 6567, 6

Extract rest values from **values**.

In [43]:
r23 = r2.rdd.map(lambda x: (x.idx,x.newcol[1:])).toDF()
r23 = r23.toDF("ID","Char")

In [44]:
r23.createOrReplaceTempView("r23")

Join two dataframe together and sort by the `idx`.

In [45]:
r24 = r21.join(r23,r21.idx==r23.ID,how="left").sort(col("idx"))

Drop some duplicate columns

In [49]:
r25 = r24.drop("ID")

In [50]:
edges_df = r25.withColumn("publication",explode(r25.Char))

In [51]:
edges_df.show()

+--------------------+-----+--------------------+---------+--------------------+-----------+
|               value|  idx|              newcol|character|                Char|publication|
+--------------------+-----+--------------------+---------+--------------------+-----------+
|              1 6487|19430|           [1, 6487]|        1|              [6487]|       6487|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6488|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6489|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6490|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6491|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6492|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489, 6490...|       6493|
|2 6488 6489 6490 ...|19431|[2, 6488, 6489, 6...|        2|[6488, 6489

## Create an analytical dataset

You will now create an analytical dataset using SparkSQL where you will join both tables (nodes_df and edge_df) so you have the data you need to run some analytics on the data.



In [52]:
edges_df.createOrReplaceTempView("edges_df")
nodes_df.createOrReplaceTempView("nodes_df")

In [53]:
analytics = spark.sql("""select e.character as character_id, c.name as character, 
                      e.publication as publication_id, p.name as publication
                      from edges_df e 
                      left join nodes_df c on e.character = c.node_id
                      left join nodes_df p on e.publication = p.node_id
                      """).cache()


In [54]:
analytics.show(10)

+------------+--------------------+--------------+-----------+
|character_id|           character|publication_id|publication|
+------------+--------------------+--------------+-----------+
|           1|24-HOUR MAN/EMMANUEL|          6487|     AA2 35|
|           2|3-D MAN/CHARLES CHAN|          6488|   M/PRM 35|
|           2|3-D MAN/CHARLES CHAN|          6489|   M/PRM 36|
|           2|3-D MAN/CHARLES CHAN|          6490|   M/PRM 37|
|           2|3-D MAN/CHARLES CHAN|          6491|      WI? 9|
|           2|3-D MAN/CHARLES CHAN|          6492|      AVF 4|
|           2|3-D MAN/CHARLES CHAN|          6493|      AVF 5|
|           2|3-D MAN/CHARLES CHAN|          6494|     H2 251|
|           2|3-D MAN/CHARLES CHAN|          6495|     H2 252|
|           2|3-D MAN/CHARLES CHAN|          6496|      COC 1|
+------------+--------------------+--------------+-----------+
only showing top 10 rows



What are the top 10 publications with the highest number of characters?

In [55]:
analytics.createOrReplaceTempView("analytics")
spark.sql("""select publication, count(*) as char_ct from analytics 
            group by publication order by char_ct desc limit 10""").show()

+--------------------+-------+
|         publication|char_ct|
+--------------------+-------+
|               COC 1|    111|
|MARVEL MYSTERY COMIC|     92|
|                IW 3|     91|
|                IW 1|     90|
|              H2 279|     87|
|                IW 4|     80|
|                IW 2|     76|
|            MAXSEC 3|     72|
|              FF 370|     63|
|                IW 6|     60|
+--------------------+-------+



In [56]:
spark.stop()

## Saving a DataFrame to a csv
```
df.write\
    .format("com.databricks.spark.csv")\
    .option("header", "true")\
    .save("path-in-hdfs-or-cloud")
```

This lab is based on [https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/](https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/). You can see the code in that blog post to perform the same operations using RDD commands.

Before you close the Jupyter Notebook, it is best to close the connection to the Spark cluster. If you don't you may have an "orphan" connection that is eating up resources.

In [None]:
spark.stop()