# Lab: DataFrame API and Spark SQL


## Social characteristics of the Marvel Universe

In this lab you be working with a dataset that was creted about the Marvel Comics universe. The source data is a text file that was created with a graph analysis library outside of Spark, so the text file contains a lot of information and is not in a format that is easy to query with SQL. You are going to use it in this lab to practice data ingestion, manipulation and analysis using both the DataFrame API and Spark SQL. You can see more about the source data [here](http://bioinfo.uib.es/~joemiro/marvel.html).

## Understanding the data file

As mentioned above, the data file contains information about Marvel characters, publications, and which character appeared in each publication. There are two sections in the file:

- Vertices (section begins in line 1 with a header in the form of `*Vertices 19428 6486`):
    - Characters: lines 2-6487 in the format `6481 "DETHSTRYK"`, where `6481` is the node id and the name is within double quotes
    - Publications: lines 6488-19429 in the same format as characters
- Edgeslist (section begins in line 19430 with a header in the form of `*Edgeslist`): lines 19431 to the end of the file. The edge information is in the following format: `2 6488 6489 6490 6491 6492 6493 6494 6495 6496`. This represents a relationship between the character id (the first number) and the publication id's (all other numbers.)

Type `spark` to see the attributes of your `SparkSession`. The `SparkSession` is your entry point to using the DataFrame API.

In [0]:
spark

Type `sc` to see the `SparkContext` attributes. The `SparkContext` is your entry point to the RDDs.

In [0]:
sc

## Read in the data.

Although the data is a flat file with the structure explained earlier, you will be working with this file using [Spark DataFrame API and Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html).

In the `00-setup-data-connection` workbook you created a mount point for the cloud location of the data.

Load in the text file using [Generic load functions for SparkSQL](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#data-sources), and load the file `/mnt/course-datasets/marvel/porgat.txt`

Create a DataFrame called `df_in` with a single field, where each record is the full text per line in the original text file.

In [0]:
path = "/mnt/course-datasets/marvel/porgat.txt"
df_in = spark.read.text(path)

Look at the first few lines of `df_in`. What is the default field name?

In [0]:
df_in.head(10)

Count the number of rows in `df_in`:

In [0]:
number_of_rows = df_in.count()
print(number_of_rows)

What happens to a DataFrame when you bring it into your Python session from the cluster? Get the first few records of the `df_in` and save it into an object in your Python session:

In [0]:
databricks_lab = df_in.head(10)

Explore the local Python object. How is it different than the DataFrame in the cluster? Read up on the [Pyspark Row object](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=row#pyspark.sql.Row)

In [0]:
print(databricks_lab)

In [0]:
df_in.head(10)

## Arrange the data

Since the data is in text format, at the end of this section you will have two DataFrames: one for nodes (both characters and publications) and one for edges (the relationship between characters and publications.)

The DataFrame API and SparkSQL functions are in the [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) library. You can import all of the functions or specific functions as needed.

Start by importing the `monotonically_increasing_id` function:

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

Create a new dataframe called `df_in_idx` where you will add a new field called `idx` that uses the `monotonically_increasing_id` to add a unique incremental numeric index to each record. You should read more about the function and understand how it works.

In [0]:
df_in_idx = df_in.withColumn("idx", monotonically_increasing_id())

See your new dataframe, you will see the new column.

In [0]:
df_in_idx.show(10)

In [0]:
df_in_idx.head(10)

We know from the data file that the two headers are in lines 1 and 19430, and we want those lines (records) from the data. Create a new dataframe called `df_idx_no_hdr` where using a sql query, you select all records but those with the header.

Before you can run a SparkSQL query, you need to "register" a dataframe.

In [0]:
# register the view
df_in_idx.createOrReplaceTempView("df_in_idx")

# run a SQL query
df_idx_no_hdr = spark.sql('select * from df_in_idx where idx !=0 and idx != 19429')

Check that the header rows were removed:

In [0]:
df_idx_no_hdr.head(10)

Now you will create three separate dataframes: a `characters` one, a `publications` one, and a `relationships` one. You know the original line indices that partition the file, so use those. You have the `idx` field to use.

In [0]:
from pyspark.sql.functions import col


In [0]:
char = df_in_idx.filter(df_in_idx.idx.between(1,6486))
char.show(10)

In [0]:
pub = df_in_idx.filter(df_in_idx.idx.between(6487,19428))
pub.show(10)

In [0]:
relationships = df_in_idx.filter(df_in_idx.idx.between(19430,30519))
relationships.show(10)

Register the `characters` and `publications` dataframes in order to run SQL commands:

In [0]:
# registering characters and publications
char.createOrReplaceTempView("char")
pub.createOrReplaceTempView("pub")

Use the `regexp_extract` function within a SQL statement and a regular expression to create three fields from the whole line: the first field will be the integer (which is the node id), the second is the text **between** the double quotes, and the third wether it is a character or publication. Create two new separate datframes, one for characters and the other for publications.

In [0]:
char_df = spark.sql("select idx as node_id, regexp_extract(value, '^[0-9]+ \"(.*)\"$', 1) as name, \"character\" as node_type from char")

In [0]:
publication_df = spark.sql("select idx as node_id, regexp_extract(value, '^[0-9]+ \"(.*)\"$', 1) as name, \"publication\" as node_type from pub")

In [0]:
char_df.show(50)

In [0]:
publication_df.show(50)

Stack both the character and publications into a single dataframe called `nodes_df`, and cache it.

In [0]:
nodes_df = char_df.union(publication_df).cache()

In [0]:
nodes_df.filter(col("node_id") > 19410).show()

Now you will work on the relationships dataframe. Import the `split` and `explode` functions:

In [0]:
from pyspark.sql.functions import split, explode

In [0]:
rel_rdd = relationships.select("value").rdd

In [0]:
relationships.show()

In [0]:
r2 = relationships.withColumn("newcol", split(col("value"), " ")).cache() #convert value to array

Now, using a chain of daframe api methods, start with the value field in the `relationships` dataframe, wher you will create

In [0]:
r2.show()

In [0]:
edges_df = relationships.withColumn("newcol", split(col("value"), " ")) \
    .withColumn("character", col("newcol").getItem(0)) \
    .select("character", explode(col("newcol")).alias("publication")) \
    .filter(col("character") != col("publication")).cache()

In [0]:
edges_df.show(50)

In [0]:
#  Register relationships as a temporary view
nodes_df.createOrReplaceTempView("nodes_df")
edges_df.createOrReplaceTempView("edges_df")

In [0]:
spark.sql("select count(*) from edges_df").collect()

## Create an analytical dataset

You will now create an analytical dataset using SparkSQL where you will join both tables (nodes_df and edge_df) so you have the data you need to run some analytics on the data.

In [0]:
edges_df.show(10)

In [0]:
nodes_df.show(10)

In [0]:
analytics = spark.sql("""select e.character as character_id, c.name as character, 
                      e.publication as publication_id, p.name as publication
                      from edges_df e 
                      left join nodes_df c on e.character = c.node_id
                      left join nodes_df p on e.publication = p.node_id
                      """).cache()


In [0]:
analytics.show(10)

What are the top 10 publications with the highest number of characters?

In [0]:
analytics.createOrReplaceTempView("analytics")
spark.sql("""select publication, count(*) as char_ct from analytics 
            group by publication order by char_ct desc limit 10""").show()

## Saving a DataFrame to a csv

In [0]:
analytics.write\
    .format("com.databricks.spark.csv")\
    .option("header", "true")\
    .save("path-in-hdfs-or-cloud")

This lab is based on [https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/](https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/). You can see the code in that blog post to perform the same operations using RDD commands.