In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

# create the SparkSession class,
# which is the entry point into all functionality in Spark
spark = (SparkSession
         .builder
         .master('local[*]')  # set it to run on all cores on a local machine
         .appName('Practice')
         .config(conf = SparkConf())
         .getOrCreate())

# set the log level to ERROR to prevent 
# the terminal from showing too many information
sc = spark.sparkContext
sc.setLogLevel('ERROR')

# 1. magic for inline plot
# 2. magic to print version
# 3. magic so that the notebook will reload external python modules
%matplotlib inline
%load_ext watermark
%load_ext autoreload 
%autoreload 2

%watermark -a 'Ethen' -d -t -v -p numpy,pandas,matplotlib,pyspark

Ethen 2017-09-09 10:35:04 

CPython 3.5.2
IPython 6.1.0

numpy 1.13.1
pandas 0.20.3
matplotlib 2.0.0
pyspark 2.2.0


# Spark

## Background

Spark is a fast and general framework for large scale data processing. 

- It is often times compared with Hadoop MapReduce and it claims to be 10 ~ 100 times faster depending on whether we're running it on memory or disk. Of course, you're mileage may vary, but it is proven to be faster in most cases and has some modern features. One reason that it's faster is that it uses a **directed acyclic graph (DAG)** to optimize the workload. At a high level, this means Spark will not execute our task until we've explicitly specify that we've wish to collect the result. Because of this, it can optimize the execution of all our intermediate steps whenever possible.
- The framework gives us access to the power of horizontal scaling, i.e. we can start of by writing spark code that runs on our laptop for development purposes and the same code can be used to run on a cluster on computers and the cloud. At a high level, the way spark works is we start off by developing the driver program, then on top of that, we have a cluster manager. For the cluster manager, spark comes packaged with its own cluster manager, but we can also use other options such as YARN. This cluster manager is responsible for distributing the work defined by our driver script among mutliple nodes/machines. Every machine that we run on will have an Executor process, which has its own cache and list of tasks. Each executor is in charge of finishing the operation we've defined on the chunk of data it receives and then aggregating the results back together to pass it along to the next step. As an end-user, for the most part, all we need to worry about is the logic of our driver program, i.e. how we wish to process the data. And spark and the cluster manager will take care of efficiently distributing the workload among the multiple machines at its disposable.
- Spark itself consists of many components, including **Spark Core:** it deals with basic operations, such as manipulating with our data and collecting the results of various operations. Then there are libraries that are built on top of that core to make complex operations easier. e.g. **Spark Streaming** for dealing with data that comes in real time; **Spark SQL** for manipulating with data in a SQL-like manner; **Spark ML/MLLib** allows us to do machine learning on large datasets; **GraphX** framework for getting attributes out of network like data.


## Resilient Distributed Dataset (RDD)

**Resilient Distributed Dataset (RDD)** was the primary user-facing API in Spark since its inception. At its core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes on our cluster that can be operated in parallel with a low-level API that offers transformations and actions.

The whole idea is that by working with RDDs, all we need to worry about is the operations that we wish to perform to our data. Then we let RDDs handle the 1) Resilient - making sure that it's fault-tolerant, so when running the process on multiple machines, it can still move on with the exeuction if one machine happens to go down. 2) Distributed - making sure the operation is parallelized across multiple machines in an efficient manner. 3) Dataset - At its core, it is still rows and rows of dataset.


## DataFrames

Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.

# Reference

- [Blog: A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)

# Spark DataFrames

## Practice 1

Count the number of times each rating number occured.

In [2]:
data_path = os.path.join('ml-100k', 'u.data')
df = spark.read.csv(data_path, sep = '\t', header = None)

# probably easiest way of renaming multiple columns
df = df.toDF('user', 'item', 'rating', 'timestamp')
df.show(5)

+----+----+------+---------+
|user|item|rating|timestamp|
+----+----+------+---------+
| 196| 242|     3|881250949|
| 186| 302|     3|891717742|
|  22| 377|     1|878887116|
| 244|  51|     2|880606923|
| 166| 346|     1|886397596|
+----+----+------+---------+
only showing top 5 rows



In [13]:
rating_col = 'rating'
results = (df
           .select(rating_col)
           .groupBy(rating_col)
           .count()
           .toPandas())
results

Unnamed: 0,rating,count
0,3,27145
1,5,21201
2,1,6110
3,4,34174
4,2,11370


## Practice 2

Finding the average ratings per item.

In [4]:
# cast the ratings to float as oppose to the original string type
# so we can perform the average operation
item_col = 'item'
results = (df
           .withColumn(rating_col, F.col(rating_col).cast('float'))
           .groupBy(item_col)
           .mean(rating_col)
           .toPandas())
results.head()

Unnamed: 0,item,avg(rating)
0,829,2.647059
1,1436,2.5
2,467,3.791667
3,691,3.5
4,1090,2.405405


## Practice 3

Finding the minimum temperature.

In [5]:
data_path = os.path.join('sparkScala', 'code', '1800.csv')
df = (spark
      .read.csv(data_path, sep = ',', header = None)
      .select('_c0', '_c1', '_c2', '_c3')  # the pattern of the default column name _c[number]
      .toDF('station', 'date', 'entry_type', 'temperature'))
df.show(5)

+-----------+--------+----------+-----------+
|    station|    date|entry_type|temperature|
+-----------+--------+----------+-----------+
|ITE00100554|18000101|      TMAX|        -75|
|ITE00100554|18000101|      TMIN|       -148|
|GM000010962|18000101|      PRCP|          0|
|EZE00100082|18000101|      TMAX|        -86|
|EZE00100082|18000101|      TMIN|       -135|
+-----------+--------+----------+-----------+
only showing top 5 rows



In [6]:
from pyspark.sql.types import FloatType


def to_fahrenheit(record):
    """converts a tenth of a celsius to fahrenheit"""
    record = float(record) * 0.1 * 9 / 5 + 32
    return record


temperature_col = 'temperature'
udf_to_fahrenheit = F.udf(to_fahrenheit, FloatType())
results = (df
           .withColumn(temperature_col, udf_to_fahrenheit(F.col(temperature_col)))
           .filter(F.col('entry_type') == 'TMIN')
           .groupBy('station')
           .min(temperature_col)
           .toPandas())
results

Unnamed: 0,station,min(temperature)
0,ITE00100554,5.36
1,EZE00100082,7.7


## Practice 4

Wordcount

In [7]:
# toy example
sentence = spark.createDataFrame([('Hi, you!',),
                                  (' No under_score!',),
                                  (' *      Remove punctuation then spaces  * ',)], ['sentence'])
sentence.show()

+--------------------+
|            sentence|
+--------------------+
|            Hi, you!|
|     No under_score!|
| *      Remove pu...|
+--------------------+



We'll write a function that does some standard text preprocessing:

- All punctuation should be removed.
- Any leading or trailing spaces on a line should be removed.
- Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word) .

For these tasks we shouldn't need to use user-defined functions. Unlike UDFs, Spark SQL functions operate directly on JVM, so they can operate on data in its "native" representation without having to perform a lot of serialization and deserialization and typically are well integrated with both Catalyst and Tungsten. It means these can be optimized in the execution plan. If we wrote the UDFs in non-native language in Python is might even lead to more overhead as it involves data movement between Python interpreter and JVM.

In [8]:
def clean(column):
    """
    Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Arguments
    ---------
    column : Column
        A Column containing a sentence.

    Returns
    -------
    cleaned : Column
        A Column with clean-up operations applied.
    """
    removed_punct = F.regexp_replace(column, r'[^a-zA-Z0-9\s]', '')
    cleaned = F.trim(F.lower(removed_punct))
    return cleaned

In [9]:
word_col = 'word'
cleaned_col = 'cleaned'

# after cleaning up the text, we apply the .split spark SQL function
# to split the words into an array and apply .explode to return a new
# row for every element in the array (documentation page listed below)
# http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
(sentence
 .select(clean(F.col('sentence')).alias(cleaned_col))
 .select(F.explode(F.split(cleaned_col, r'\s+')).alias(word_col))
 .groupBy(word_col)
 .count()
 .show())

+-----------+-----+
|       word|count|
+-----------+-----+
| underscore|    1|
|        you|    1|
|     remove|    1|
|     spaces|    1|
|         no|    1|
|punctuation|    1|
|         hi|    1|
|       then|    1|
+-----------+-----+



## Practice 5

Most popular movies (popular as in the number of times it's been viewed).

Broadcast variables: explicitly sending the read-only piece of data to all the nodes, so that it will only be send across the network once and will be available when it's needed.

In [None]:
# sc.Broadcast to ship off the variable
data_path = os.path.join('ml-100k', 'u.data')
df = (spark
      .read.csv(data_path, sep = '\t', header = None)
      .toDF('user', 'item', 'rating', 'timestamp'))

df.show(5)

In [None]:
item_col = 'item'
results = (df
           .select(item_col)
           .groupBy(item_col)
           .count()
           .orderBy(F.desc('count'))
           .toPandas())
results.head()

# Reference

- [Stackoverflow: Spark functions vs UDF performance?](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance)

In [None]:
data = {
    'hostid': [1, 1, 2, 2],
    'itemname': ['A', 'B', 'A', 'C'],
    'itemvalue': [10, 3, 9, 40]
}

data = pd.DataFrame(data)
df = spark.createDataFrame(data)
df.show()

[Stackoverflow: Pivot rows in mysql](https://stackoverflow.com/questions/1241178/mysql-rows-to-columns/9668036#9668036)

In [None]:
df.createOrReplaceTempView("history")
sql_df1 = spark.sql(
    """
    SELECT 
        hostid,
        CASE WHEN itemname = "A" THEN itemvalue END AS A,
        CASE WHEN itemname = "B" THEN itemvalue END AS B,
        CASE WHEN itemname = "C" THEN itemvalue END AS C
    FROM 
        history
    """
)
sql_df1.show()

In [None]:
sql_df1.createOrReplaceTempView('history_extended')
sql_df2 = spark.sql(
    """
    SELECT
        hostid,
        MIN(A) AS A,
        MIN(B) AS B,
        MIN(C) AS C
    FROM 
        history_extended
    GROUP BY 
        hostid
    """
)
sql_df2.show()

In [None]:
sql_df1 = spark.sql(
    """
    SELECT
        hostid, 
        SUM( IF(itemname = 'A', itemvalue, 0) ) AS A,  
        SUM( IF(itemname = 'B', itemvalue, 0) ) AS B, 
        SUM( IF(itemname = 'C', itemvalue, 0) ) AS C 
    FROM 
        history
    GROUP BY
        hostid
    """
)
sql_df1.show()