# Large-Scale Distributed Systems: SDSS Project

In [None]:
!date

## Introduction

For a summary of the project context, including the logistics of the project, please refer to the [project description](https://github.com/glouppe/info8002-large-scale-database-systems/raw/master/projects/project_analysis.pdf).

## Spark Configuration

In [None]:
from pyspark import SparkConf
from pyspark.sql import Row
from pyspark.sql import SparkSession

import pyspark
import os

In [None]:
# Now, in order to use the libraries, make sure the libs are included in $PATH and $PYTHONPATH.
# Add this to your .bashrc or .zshrc depending on your shell.
!export SPARK_HOME=/opt/apache-spark   # CHANGE DESTINATION PATH IF DESIRED.
!export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
!export PATH="$SPARK_HOME/bin:$PATH"

# Usually this is done at a cluster level, so you should not have to worry about it.
!export JAVA_OPTS="-Xms1024m -Xmx2048m" # Adjust to your requirements.

In [None]:
# We will need this at a later stage, I will come back to this later.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-avro_2.11:4.0.0 pyspark-shell'

In [None]:
# Define the name of your application.
application_name = "Large-Scale Distributed Systems: SDSS"

# Define the master address, for local mode this is local[*].
# If you don't want to use all cores on your machine please specify local[n].
master = "local[*]"
# Number of executors.
num_executors = 1
# Number of child processes per executors.
num_processes = 2
# Total number of parallel processes.
num_workers = num_executors * num_processes

In [None]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", str(num_processes))
conf.set("spark.executor.instances", str(num_executors))
conf.set("spark.executor.memory", "4g") # Adjust according to your requirements.
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

In [None]:
# Allocate a Spark session based on the provided configuration and application name.
# Which may, or may not, already be created (getOrCreate()).
spark = SparkSession.builder.config(conf=conf).appName(application_name).getOrCreate()
# Create a shortcut for the SparkContext.
sc = spark.sparkContext

## Retrieving the list of data-files

As I mentioned in the Spark Tutorial, a common beginner's mistake is to load all the data in the driver process, and then parallize the data to the executors. This is fine for smaller toy-problems, but obviously this doesn't scale to larger datasets. An alternative would be to apply the following idea:

Assuming there is an underlying distributed filesystem, like HDFS or NFS, the absolute path of the data files remain identical across all other nodes. A possiblity would be to first obtain the paths of all data files, parallelize these to the executors, and let the executors read and parse the FITS files in parallel.

In [None]:
# Do something.

## Inspecting a single FITS file


Let us first investigate the structure of a single FITS file. A FITS file always consists of a set of HDU (Header Data Unit). However, some HDU's might be a table-like structure, an image, or some other binary format. The specific type can always be inferred from the HDU list as shown below.

In [None]:
from astropy.io import fits

# Lets inspect the file structure of a single FITS file.
hdulist = fits.open(plates[0])

In [None]:
# Show the information on all HDU's in the FITS file.
hdulist.info()

In [None]:
# Describe the header of the 'BinTableHDU'.
hdulist[1].header

In [None]:
# Close the HDU access handle.
hdulist.close()

## Parsing FITS files with Apache Spark

Since we are only interested in a single state of an observation, we filter all the plates plates which have been recorded more than once.

In [None]:
# Get plates.

Parallelize all unique plates to the executors.

In [None]:
# Parallelize plates.

The following cell will apply a lambda function handling all paths that have been allocated to a single partition. In this partion, we iterate over every provided absolute path, and process every records associated with the data table. Furthermore, we only extract the required data that we need to solve the questions posed below. Nevertheless, there are some special cases that we need to be aware of. For instance, every fiber has an associated `warning` level. To improve the quality of the analysis, we only select the fibers which have no warnings assigned to them, i.e., `ZWARNING = 0` [SDSS ZWARNING description](http://www.sdss3.org/dr8/algorithms/bitmask_zwarning.php). Afterwards, we convert the RDD to a *DataFrame*.

In [None]:
def parse_plates(iterator):
    # Iterate through all plates in the current iterator.
    for path in iterator:
        # Open the specified FITS file.
        hdulist = fits.open(path)
        # Iterate through all records in the HDU.
        for record in hdulist[1].data:
            # Check if the current object has spectra problems.
            if record['ZWARNING'] == 0:
                d = {}
                # Extract the desired columns, and convert them to a more favourable type.
                d['plate']          = int(record['PLATE'])
                d['tile']           = int(record['TILE'])
                d['mjd']            = int(record['MJD'])
                d['fiber']          = int(record['FIBERID'])
                d['ra']             = float(record['PLUG_RA'])
                d['dec']            = float(record['PLUG_DEC'])
                d['class']          = record['CLASS']
                d['subclass']       = record['SUBCLASS']
                d['z']              = float(record['Z'])
                d['z-err']          = float(record['Z_ERR'])
                d['spectro-flux']   = record['SPECTROFLUX'].tolist()
                yield Row(**d)
        hdulist.close()

# Read a DataFrame of objects from the plate paths RDD.
objects = # Read plates and convert to DataFrame.

objects.printSchema()

In [None]:
print("Number of objects without problems: " + str(objects.count()))

**Tip:** It might be useful to write the dataframe to disk, this would prevent you from having to parse all the individual FITS files (which eventually benefits the query performance).

## Questions

### Question 0

To show you what an example use-case would be, let us assume that we are interested in the stars which are moving towards us. Meaning, they are *blueshifted*, i.e., $z < 0$. This particular information could be used to set-up a survey which is interested in answering the question whether those particular stars might *one-day* disturb the stability of our solar-system. Therefore, we want to extract their coordinates (in terms of Right Ascension and Declination) fur further, and more detailed observations.

In [None]:
objects.where('class = "STAR"').where('z < 0').select('ra', 'dec').distinct().show()

# Instead of .show() you could call .collect() which returns a list.

### Question 1

*Looking at galaxies, is the expansion of the Universe similar across all regions of the sky? Meaning, is the redshift of the galaxies about equal across the sky?*

In [None]:
# Extract the galaxies.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Make a plot with the collected data.

### Question 2

*Is the expansion of the Universe accelerating?*

In [None]:
import pyspark.sql.functions as F

# Compute the acceleration, and make a plot.

### Question 3

*What is the average velocity of the galaxies which are redshifted?*

However, for large redshifts (~ z >= 1) it does not make sense to use the traditional equations as in [http://www.ifa.hawaii.edu/users/acowie/class05/home9_sol.html](http://www.ifa.hawaii.edu/users/acowie/class05/home9_sol.html). Therefore, one needs to take relativistic effects into account. As a tip, it might be interested to read these pages on Wikipedia: [https://en.wikipedia.org/wiki/Peculiar_velocity](Peculiar Velocity) and [https://en.wikipedia.org/wiki/Recessional_velocity](Recessional Velocity).

In [None]:
# Code goes here.

### Question 4

*What is the average velocity of the quasars which are redshifted?*

Relativistic effects are very important here! Why?

In [None]:
# Code goes here.

### Question 5

*Are there galaxies with a relatively small flux which are blueshifted?*

**Tip**: [https://en.wikipedia.org/wiki/Absolute_magnitude](Absolute Magnitude) & [https://en.wikipedia.org/wiki/Apparent_magnitude](Apparent Magnitude).

In [None]:
def spectro_flux_to_magnitude(row):
    """Converts the spectro-flux filter array to magnitudes
    according to:
    
    SPECTROFLUX 5-element array of integrated flux in each of the
    5 SDSS imaging filters (ugriz); the units are nanomaggies,
    which is 1 at 22.5 magnitude; convert to magnitudes with
    22.5 - 2.5 * LOG_10(SPECTROFLUX);
    
    The u-band and z-band counts are meaningless and should not be used.
    
    So, we apply said equation by taking the average of the gri filters.
    """
    d = row.asDict()
    spectroflux = d['spectro-flux']
    d['hasflux'] = (np.count_nonzero(spectroflux) > 0) # Why is this important?
    averaged_flux = np.abs(np.average(spectroflux[1:4]))
    magnitude = 22.5 - 2.5 * np.log10(averaged_flux)
    del d['spectro-flux']
    d['magnitude'] = float(magnitude)
    
    return Row(**d)


# Code goes here.

### Question 6

*What is the distribution of the spectral type of all observed stars?*

In [None]:
# Make a histogram for all spectral types.

## Question 7

*How does the query performance scale when tuning the number of partitions, relative to the amount of CPU cores that are available? *

In [None]:
# Code goes here.

## Conclusion

*Your conclusion here.*

In [None]:
# Summary with plots describing the results.

## Stopping the Spark Session

In [None]:
spark.stop()