# Introduction
## Big data & Hadoop

There was a time when a researcher could gather all available data in their field of knowledge in a small library at home and produce results using a pen and a sheet of paper. With personal computers and laptops we have been able to extend our storage and processing power farther than we ever expected, but they cannot cope with it anymore.

Nowadays, scientific experiments generate such amounts of data that they don't fit in a personal computer, not even in a data center such as PIC. This huge need of computing and storage resources is one of the factors that drive the scientific collaborations worldwide. Also, this dramatic increase in capacity and performance that is needed for current experiments requires specific architectures to store and process all this data.

Big Data platforms are a combination of hardware and software designed to handle massive amounts of data. The most popular one is Hadoop. Hadoop is based on the design originally published by Google in several papers comprising, among others, of a:
 - distributed file system (HDFS)
 - MapReduce programming model

## HDFS

The Hadoop Distributed File System (HDFS) is the basis of the Hadoop platform, and it is built to work on top of commodity computer clusters. In this architecture, dozens up to thousands of cheap computers work in a coordinate manner to store and process the data. Due to the large number of elements involved (computer components, network, power, etc.) the platform was designed from the ground up to be failure tolerant. Should any element fail at any time, the system would detect the condition and recover from it transparently, and the user will not ever notice.

HDFS works by splitting the files in 128 MiB blocks and replicating them on the cluster nodes in such a way that if a node fails, data is still accessible from any other replica.

![HDFS](../resources/hdfs.png)

## MapReduce

MapReduce is programming model used for generating and processing big data sets with parallel and distributed algorithms. Inspired by the map and reduce functions commonly used in functional programming, its key contribution is the scalability and fault-tolerance achieved by optimizing the execution engine.

In MapReduce, data operations are defined with respect to data structured in (key, value) pairs:
 - `Map` takes one pair of data in one data domain and returns a list of pairs in a different domain:
       Map(k1,v1) → list(k2,v2)
   The Map function is applied in parallel to every pair (keyed by k1) in the input dataset. This produces a list of pairs (keyed by k2) for each call. After that, the MapReduce framework collects all pairs with the same key (k2) from all lists and groups them together, creating one group for each key.


 - `Reduce` is then applied in parallel to each group, which in turn produces a collection of values in the same domain:
       Reduce(k2, list (v2)) → list(v3)
   Each Reduce call typically produces either one value v3 or an empty return, though one call is allowed to return more than one value. The returns of all calls are collected as the desired result list.





## Apache Spark

Is an open-source cluster-computing framework that can run on top of Apache Hadoop. Built on top of MapReduce, if offers an improved interface for non-linear algorithms and operations. Apache Spark is based on a specialized data structure called the resilient distributed dataset (RDD). The use of RDDs facilitates the implementation of iterative algorithms and interactive/exploratory analysis. The latency of Spark applications, compared to a pure MapReduce implementation, may be reduced by several orders of magnitude.

Apache Spark comprises several modules which implement additional processing abilities to the RDDs such as:
 - Spark SQL: structured data like database result sets
 - Spark Streaming: real-time data
 - Spark MLlib: machine learning
 - Spark Graphx: graph processing

For this course, we will introduce the mechanics of working with large datasets using Spark. Ideally, each one of you would have a entire Hadoop cluster to work with but, we are not CERN... so we make use of the ability of Spark to run locally, without a cluster. Later, you could run the same code on top of a Hadoop cluster without changing anything.

# Map & Reduce

### Note:

Spark operations can be classified as either:
 - ACTIONS: Trigger a computation and return a result
    - reduce, collect, aggregate, groupBy, take, ...
 - TRANSFORMATIONS: return a new RDD with the transformation applied (think of composing functions)
    - map, reduce, filter, join, ...

## Concepts

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
# We define our input
l = range(10)
l

range(0, 10)

In [3]:
# We "upload" it as an RDD
rdd = sc.parallelize(l)
rdd

PythonRDD[1] at RDD at PythonRDD.scala:48

### map()

In [4]:
# We define a map function
def power_of_2(k):
    return 2**k

# And we apply it to our RDD
rdd.map(power_of_2)

PythonRDD[2] at RDD at PythonRDD.scala:48

In [5]:
# So we use collect() to retrieve all results.
rdd.map(power_of_2).collect()

### WARNING ###
# Never do that in real cases, or you will transfer ALL data to your browser, effectibly killing it.

[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]

### reduce()

In [6]:
# What about summing, everything?
# We define a reduce function
def sum_everything(k1, k2):
    return k1 + k2

# And we apply the reduce operation
rdd.reduce(sum_everything)

45

In [7]:
# Or we can use the built in operation `sum`
rdd.sum()

45

### pipelining

In [8]:
# What if I wanted to compute the sum of the powers of 2?
rdd.map(power_of_2).reduce(sum_everything)
# or 
rdd.map(power_of_2).sum()

1023

In [9]:
# How can we count the number of elements in the array?
rdd.count()

10

Ok, too easy, this is supposed to be a map & reduce tutorial...

How can we do it WITHOUT the count() action, just using map & reduce.

**SPOILER**, you could add 1 for each element in the RDD:
 - Build a map function that given an element, it transforms it into a 1.
 - Then apply our `sum_everything` reduce function

In [10]:
def set_to_1(k):
    return 1

rdd.map(set_to_1).reduce(sum_everything)

10

## Unstructured data

In [20]:
# Load all Shakespeare works
import os
shakespeare = sc.textFile(os.path.normpath('file:///../../resources/shakespeare.txt'))

In [21]:
# Show the first lines
shakespeare.take(10)

['This is the 100th Etext file presented by Project Gutenberg, and',
 'is presented in cooperation with World Library, Inc., from their',
 'Library of the Future and Shakespeare CDROMS.  Project Gutenberg',
 'often releases Etexts that are NOT placed in the Public Domain!!',
 '',
 'Shakespeare',
 '',
 '*This Etext has certain copyright implications you should read!*',
 '',
 '<<THIS ELECTRONIC VERSION OF THE COMPLETE WORKS OF WILLIAM']

In [22]:
# Get the longest line
def keep_longest(k1, k2):
    if len(k1) > len(k2):
        return k1
    else:
        return k2

shakespeare.reduce(keep_longest)

"    whither wilt?' ROSALIND. Nay, you might keep that check for it, till you met your"

In [23]:
# Compute the average line length
def line_length(k):
    return len(k)

shakespeare.map(line_length).sum() / shakespeare.count()

42.85645529343704

### flatMap() vs map()

In [24]:
# Split the text in words
def split_by_space(k):
    return k.split(' ')

shakespeare.map(split_by_space).take(2)

[['This',
  'is',
  'the',
  '100th',
  'Etext',
  'file',
  'presented',
  'by',
  'Project',
  'Gutenberg,',
  'and'],
 ['is',
  'presented',
  'in',
  'cooperation',
  'with',
  'World',
  'Library,',
  'Inc.,',
  'from',
  'their']]

In [25]:
shakespeare.flatMap(split_by_space).take(15)

['This',
 'is',
 'the',
 '100th',
 'Etext',
 'file',
 'presented',
 'by',
 'Project',
 'Gutenberg,',
 'and',
 'is',
 'presented',
 'in',
 'cooperation']

### lambda functions

In [26]:
shakespeare.flatMap(
    lambda k: k.split(' ') # Split in words
).take(15)

['This',
 'is',
 'the',
 '100th',
 'Etext',
 'file',
 'presented',
 'by',
 'Project',
 'Gutenberg,',
 'and',
 'is',
 'presented',
 'in',
 'cooperation']

### filter()

In [27]:
# Retrieve 10 words longer than 15 characters
shakespeare.flatMap(
    lambda k: k.split(' ') # Split in words
).filter(
    lambda k: len(k)>15    # Keep words longer than 15 characters
).take(10)

['shaks12.zip*****',
 'julianc@netcom.com',
 '100,000,000=Trillion]',
 'hart@vmd.cso.uiuc.edu',
 'mrcnext.cso.uiuc.edu',
 'hart@vmd.cso.uiuc.edu',
 '>internet:hart@.vmd.cso.uiuc.edu',
 'internet!vmd.cso.uiuc.edu!Hart',
 '(72600.2026@compuserve.com);',
 'self-substantial']

### Exercise

How many times did use the word 'murder'? (**case insensitive**)

In [28]:
shakespeare.flatMap(
    lambda k: k.split(' ') # Split in words
).map(
    lambda k: k.lower()
).filter(
    lambda k: k == 'murder'
).count()

44

### Exercise

Show 10 words longer than 15 characters

In [29]:
shakespeare.flatMap(
    lambda k: k.split(' ')
).filter(
    lambda k: len(k)>15
).take(10)

['shaks12.zip*****',
 'julianc@netcom.com',
 '100,000,000=Trillion]',
 'hart@vmd.cso.uiuc.edu',
 'mrcnext.cso.uiuc.edu',
 'hart@vmd.cso.uiuc.edu',
 '>internet:hart@.vmd.cso.uiuc.edu',
 'internet!vmd.cso.uiuc.edu!Hart',
 '(72600.2026@compuserve.com);',
 'self-substantial']

### Exercise
Show all words longer than 15 characters, but dropping those with any of the following characters (`. , -`)

In [30]:
shakespeare.flatMap(
    lambda k: k.split(' ')
).filter(
    lambda k: not (set('.,-') & set(k))
).filter(
    lambda k: len(k)>15
).collect()

['incomprehensible',
 'Gloucestershire;',
 "NORTHUMBERLAND'S",
 "NORTHUMBERLAND'S",
 'Gloucestershire;',
 "Northumberland's",
 'KING_HENRY_VIII|EPILOGUE',
 'enfranchisement!"',
 'misconstruction;',
 'honorificabilitudinitatibus;',
 'Anthropophaginian',
 "disproportion'd;",
 'Gloucestershire;',
 'indistinguishable']

### Exercise

Retrieve the longest word (without `. , -`), reusing the `keep_longest` reduce function.

In [31]:
shakespeare.flatMap(
    lambda k: k.split(' ')
).filter(
    lambda k: not (set('.,-') & set(k))
).reduce(
    keep_longest
)

'honorificabilitudinitatibus;'

Which, as you all know, means "the state of being able to achieve honours".

## Working with key-values

### List the 10 most used words

In [None]:
shakespeare.flatMap(
    lambda k: k.split(' ')
).filter(
    lambda k: not (set('.,-') & set(k))

## Structured data

In [47]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [83]:
gaia = sqlc.read.csv('../resources/gaia.csv.bz2', comment='#', header=True, inferSchema=True)
gaia

DataFrame[ra: double, dec: double, phot_g_mean_mag: double, phot_g_mean_flux_error: double]

In [84]:
gaia.count()

4468363

In [85]:
gaia.head(5)

[Row(ra=290.21102203284977, dec=-30.512276670546044, phot_g_mean_mag=19.298657696970963, phot_g_mean_flux_error=2.1969861437903213),
 Row(ra=276.8551136254776, dec=-45.56829255155762, phot_g_mean_mag=19.71404456309139, phot_g_mean_flux_error=2.8153702962592555),
 Row(ra=291.2714016666681, dec=-30.168438928928577, phot_g_mean_mag=19.88970685445606, phot_g_mean_flux_error=2.2826860042571675),
 Row(ra=276.8386275958223, dec=-45.573612128533604, phot_g_mean_mag=18.739001037286407, phot_g_mean_flux_error=3.0656237334224734),
 Row(ra=291.2731316966207, dec=-30.163064937921643, phot_g_mean_mag=18.553687802187834, phot_g_mean_flux_error=7.280203675814308)]

### Add a healpix column

In [92]:
import healpy as hp
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [113]:
hpix_order = 3

def hpix(ra, dec):
    phi = math.pi/180. * ra
    theta = math.pi/180. * (90.0 - dec)
    
    return int(
        hp.pixelfunc.ang2pix(2**hpix_order, theta, phi, nest=True)
    ) # Cast from numpy.int64 to python int

# Define UDF and return type
hpix_udf = udf(hpix, IntegerType())

In [114]:
gaia.ra

Column<b'ra'>

In [115]:
hpix_udf(gaia.ra, gaia.dec)

Column<b'hpix(ra, dec)'>

In [116]:
gaia_hpix = gaia.withColumn("hpix", hpix_udf(gaia.ra, gaia.dec))
gaia_hpix

DataFrame[ra: double, dec: double, phot_g_mean_mag: double, phot_g_mean_flux_error: double, hpix: int]

In [117]:
gaia_hpix.head(5)

[Row(ra=290.21102203284977, dec=-30.512276670546044, phot_g_mean_mag=19.298657696970963, phot_g_mean_flux_error=2.1969861437903213, hpix=750),
 Row(ra=276.8551136254776, dec=-45.56829255155762, phot_g_mean_mag=19.71404456309139, phot_g_mean_flux_error=2.8153702962592555, hpix=744),
 Row(ra=291.2714016666681, dec=-30.168438928928577, phot_g_mean_mag=19.88970685445606, phot_g_mean_flux_error=2.2826860042571675, hpix=750),
 Row(ra=276.8386275958223, dec=-45.573612128533604, phot_g_mean_mag=18.739001037286407, phot_g_mean_flux_error=3.0656237334224734, hpix=744),
 Row(ra=291.2731316966207, dec=-30.163064937921643, phot_g_mean_mag=18.553687802187834, phot_g_mean_flux_error=7.280203675814308, hpix=750)]

### Group by healpix and count