# Databricks hands-on Challenge

The goal of this exercise is to simulate a real data engineering + data science scenario in one of our customers. This will include:
- obtaining a data set
- process the data to clean it up
- explore with visualizations and sql queries
- extract to cosmos
- use a machine learning algorithm to make generalizations and predictions


This exercise requires an Azure Subscription and the use of some Azure Services in adition to Azure Databricks.

_You real goal, should you accept it, is to discover one of of the most important results in stellar astrophysics._

## Download and uncompress the data data

Gaia is an ESA satelite mapping light sources in the sky. A sample dataset of the Gaia Data Release 2 (GaiaRD2) is available here (size 12415259 bytes): http://cdn.gea.esac.esa.int/Gaia/gdr2/gaia_source/csv/GaiaSource_1000172165251650944_1000424567594791808.csv.gz

The full Gaia DR2 dataset includes **61000** files like this for a total size of over 550 GB.

**Hint**: use the %sh command to jump into the Linux shell, wget to get the file, and gzip to decompress the file.

**Hint 2**: wget -O lets you specify a target folder (eg, /tmp/filename.csv.gz) and gzip -d -f can be used for decompressing.

In [3]:
%sh

# TODO: optionally, create a folder to hold the file (e.g., /mnt/gaiadr2)

# TODO: download the file with wget

# TODO: decompress the file

# TODO: list the contents of the folder

## Create an Azure Storage Account in the Azure Portal

1. Create an Azore Blog Storage account - See instructions here: https://docs.microsoft.com/en-gb/azure/storage/common/storage-quickstart-create-account

2. Create a Container in the Storage account. See intructions here: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-portal

3. Write down the name of the storage account, container name, and one of the access keys (https://docs.microsoft.com/en-us/azure/storage/common/storage-account-manage)

Notes:
- use the same region where the Databricks cluster was created
- use Performance=Standard, Account Kind = Blob Storage, Replication = LRS and Access Tier = Hot

## Mount the Storage Account/Container in Databricks

This will make the storage account visible into Databricks' distributed file system (DBFS). Mounted folders are persistent accross the workspace, this only has to be done once.

Se instructions here: check section "Mount an Azure Blob Storage container" here: https://docs.databricks.com/spark/latest/data-sources/azure/azure-storage.html

In [6]:
try:
    # TODO - MOUNT THE STORAGE
except:
    print("(Mount already existed, ignoring)")

# Move the file into the DBFS location you just mounted

**Hint**: use dbutils.fs.mv(source, destination). List the contents of the target location using dbutils.fs.ls(foldername)

In [8]:
try:
  # TODO: move the file to the azure storage mount point 
except :
  print("(file not found -- moved already?)")

# TODO: List the files in Azure Storage

## Load the dataset into a Spark Dataframe and display it

Don't forget to account for the header field + infer the schema based on the data. Documentation here: https://docs.databricks.com/spark/latest/data-sources/read-csv.html#reading-files

Also note down the number of jobs that Spark will execute to do this task.

In [10]:
rawGaiaDF = #TODO
display(rawGaiaDF)

## Print the schema of the raw Dataframe

As a curiosity, the data model is described here: https://gea.esac.esa.int/archive/documentation/GDR2/Gaia_archive/chap_datamodel/sec_dm_main_tables/ssec_dm_gaia_source.html

In [12]:
# TODO

## Clean up the dataframe

The convention for column names in Spark is camel case. Create a new dataframe by renaming the following relevant columns to the right convention:
- designation
- ra: renamed to "rightAscention"
- dec: renamed to "declination"
- astrometric_pseudo_colour: renamed to "starColour"
- radial_velocity: renamed to "radialVelocity"
- teff_val: renamed to "effectiveTemperature"
- radius_val: renamed to "stelarRadius"
- lum_val: renamed to "stelarLuminosity"

**Note 1**: do this in a single multi-line instruction. After you run the command, see the number of Spark jobs that were created. Now add a display() call to see a preview of the data, and compare the number of Spark jobs created. 

** Note 2 **: unlike Dataframes in Numpy or Pandas, Spark dataframes are immutable. That means that even if you think you are updating a value in a Spark Dataframe, you really acting on a copy of it.

** Hint **: you can use the col() Spark function to refer to a column, and alias() to change it's name. The .select() function of the Dataframe allows you to do a query on a dataframe, with it's result being another dataframe. https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/functions.html

In [14]:
from pyspark.sql.functions import col

cleanedUpDF = # TODO
  
display(cleanedUpDF)

## Add two calculated columns to the dataframe

- Add a column named "distance" which has the mathematical value of **abs(1/parallax)** (1 divided by the value of the parallax column). This will be distances from us to the star in units of _Parsecs_.
- Add a column named "magnitude", with the following mathematical value: **stelarLuminosity + 5 + 5*log(distance)**. Absolute Magnitude is a way to measure how bright a star shines in the sky, at a fixed standard distance. 

**Hint:** new columns can be created with the .withColumn() method of a dataframe. More information: https://docs.azuredatabricks.net/spark/1.6/sparkr/functions/withColumn.html#withcolumn

In [16]:
from pyspark.sql.functions import log, abs

fullGaiaDF = # TODO

## Data partitioning

Although it doesn't look like it, Spark Dataframes are a distributed data structure. The dataframe you've just loaded and processed is actually distributed in the different machines in the cluster. When you run a command, the Spark jobs are being spun up to execute a distributed query, collect the results and present them to you. In the following cell, can you print the number of partitions of the dataframe?

**Hint**: Dataframes are a higher level abstraction over another data structure called RDDs, which have a method to get the number of partitions - https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/rdd/RDD.html#getNumPartitions()

In [18]:
# TODO

## Register the Dataframe as a table

To be able to use SQL directly in cells to query a dataframe, you have to register it as a table. Do this with the registerTempTable() method of the dataframe. Name the temporary table "GaiaDR2".

More information here: https://docs.azuredatabricks.net/spark/1.6/sparkr/functions/registerTempTable.html#registertemptable

In [20]:
# TODO

## Save your processed dataframe as in Apache Parquet format

Parquet is a columnar, highly optimized data format with schema support. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk. Spark has native support to write dataframes in Parquet.

In the next cell, write out your processed dataframe as Parquet.

In [22]:
writePath = "mnt/gaiadr2.parquet"

# TODO

In [23]:
%fs ls mnt/

## Query the table data using SQL

Use the following fields to write SQL queries. Don't use python, rather direct SQL.

In [25]:
%sql

-- todo: what is the minimum distance of a star to earth, and is the star with the largest magnitude?

In [26]:
%sql

-- todo: how many stars are closer to us than 1 parsec?

## Native visualizations

You can integrate visualizations using libraries such as matplotlib. Databricks however also includes native some visualization support. To demonstrate this:

a) in the next cell write a SQL query to read the columns "starColour" and "stelarLuminosity" whe the first value > 1.2 and the second is < 5 .

b) click the icon with types of graphs, under the result of the query, and pick a Scatter plot. Click "Plot Options" and make sure that in the Value field you have first the starColour (x-axis) and then the stelarLuminosity (y-axis) fields.

In [28]:
%sql

-- todo


#Optional - insert the data into CosmosDb

CosmosDB is Azure's globallt distributed, multimodel, NoSQL repository, and there is a connector from Databricks into/from CosmosDb. In the next cell, write out the contents of the dataframe into a CosmoDB database.

Relevant links:

- How to create a CosmosDB and install the connector: https://codewithsarath.com/querying-cosmos-db-in-azure-databricks-using-cosmos-db-spark-connector/
- Use the Azure CosmosDb Spark connector: https://docs.databricks.com/spark/latest/data-sources/azure/cosmosdb-connector.html
- Azure CosmosDb Spark Connector User Guide - https://github.com/Azure/azure-cosmosdb-spark/wiki/Azure-Cosmos-DB-Spark-Connector-User-Guide

**Important**: Remember to delete the CosmosDb Account you created, as costs can be high!

In [30]:
# TODO

## Congratulations!

If you made it to here, you've just **[sort of]** found one of the most important correlations of modern stelar physics: there is a a relationship between the colour of the star and how bright it shines. Astrophysics call this the Hertzsprung–Russell diagram. More information can be found here https://en.wikipedia.org/wiki/Hertzsprung%E2%80%93Russell_diagram and some real diagrams from Gaia with the correct maths are here: https://www.cosmos.esa.int/web/gaia/gaiadr2_hrd .

_(disclaimer: this was a very simplified and approximate approach, the calculations actually have many errors!)_