# Spark Skeleton
This is the skeleton for using Spark within Jupyter Notebook. Currently only Python 2 is supported. For running Python Spark programs use `spark-submit` from the command line.

## Initialization

In [1]:
"""
Load packages and create context objects...
"""
import os
import platform
import sys
sys.path.append('/usr/hdp/2.4.2.0-258/spark/python')
os.environ["SPARK_HOME"] = '/usr/hdp/2.4.2.0-258/spark'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell'
import py4j
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.storagelevel import StorageLevel
sc = SparkContext()
import atexit
atexit.register(lambda: sc.stop())
print("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version %s
      /_/
""" % sc.version)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/



Now, the Spark context is initialized. 

### Best Paractice:
Close and halt Jupyter notebooks after you worked on them. The processes and resource allocations of notebooks persist even if you close the browser window.


## Example: Loading Data Files
This Spark implementation reads data files from the Hadoop File System (HDFS). Larger files are usually broken up in many smaller chunks so that multiple processes can read them without interference. In addition most Hadoop components unzip compressed files on the fly.

In this example we load a text file and create a Spark Resilient Distributued Data (RDD) set where each row holds a line from the text file. Our data file is a collection of Twitter status records: one JSON encoded record per line.

In [2]:
tweets = sc.textFile("/user/molnar/data/election2012/cache-117000000.json.gz")

Let's look at the first record. We're going to decode the data string and encode it back to text for nicer formatting:

In [3]:
type(tweets)

pyspark.rdd.RDD

In [4]:
the_very_first_tweet = tweets.take(1)[0]  # `take()` always returns an array, even if there's just one row

import json
print json.dumps(json.loads(the_very_first_tweet), indent=4, sort_keys=True)

{
    "contributors": null, 
    "coordinates": null, 
    "created_at": "Sun Nov 11 17:24:37 +0000 2012", 
    "entities": {
        "hashtags": [
            {
                "indices": [
                    111, 
                    133
                ], 
                "text": "HAPPYBIRTHDAYAVALANNA"
            }
        ], 
        "urls": [], 
        "user_mentions": [
            {
                "id": 307879426, 
                "id_str": "307879426", 
                "indices": [
                    3, 
                    17
                ], 
                "name": "Kidrauhl \u221e", 
                "screen_name": "xBieberHoodie"
            }
        ]
    }, 
    "favorited": false, 
    "geo": null, 
    "id": 267679234212888576, 
    "id_str": "267679234212888576", 
    "in_reply_to_screen_name": null, 
    "in_reply_to_status_id": null, 
    "in_reply_to_status_id_str": null, 
    "in_reply_to_user_id": null, 
    "in_reply_to_user_id_str": null, 
    "place": 

Let's extract hash tags. In this case there's only one hashtag, but still the data structure treats it as a list.

In [5]:
json.loads(the_very_first_tweet)['entities']['hashtags'][0]['text']

u'HAPPYBIRTHDAYAVALANNA'

The extraction process is a bit more complicated that what can written in the $\lambda$-function format. We, therefore, define a function that deals with cases that do not include hashtags.

Conventionally, programmers test assumptions before they execute. I.e. checking first if hashtags exist. However, if we anticipate that these are only a few exceptions using the try-except structure make for cleaner code.

In [7]:
# easier to ask for forgiveness than permission
def extract_hash_EAFP(tw):
    try:
        return json.loads(tw)['entities']['hashtags']
    except:
        return []

# look before you leap
def extract_hash_LBYL(tw):
    t = json.loads(tw)
    if 'entities' in -t.keys():
        ent = t['entities']
        if 'hashtags' in ent.keys():
            return ent['hashtags']
    return []

hashtags = tweets.flatMap(extract_hash_EAFP).map(lambda x: (x['text'], 1))

In [10]:
tweets.flatMap(extract_hash_EAFP).take(10)

[{u'indices': [111, 133], u'text': u'HAPPYBIRTHDAYAVALANNA'},
 {u'indices': [117, 125], u'text': u'badgers'},
 {u'indices': [129, 135], u'text': u'WHUFC'},
 {u'indices': [19, 24], u'text': u'OOMF'},
 {u'indices': [0, 18], u'text': u'UnFilmQuiMaMarqu\xe9'},
 {u'indices': [92, 97], u'text': u'brfc'},
 {u'indices': [107, 112], u'text': u'lufc'},
 {u'indices': [122, 127], u'text': u'bcfc'},
 {u'indices': [131, 140], u'text': u'Panthers'},
 {u'indices': [98, 103], u'text': u'jobs'}]

In [11]:
%%time
hashtags.take(10)

CPU times: user 6.79 ms, sys: 3.38 ms, total: 10.2 ms
Wall time: 80.5 ms


[(u'HAPPYBIRTHDAYAVALANNA', 1),
 (u'badgers', 1),
 (u'WHUFC', 1),
 (u'OOMF', 1),
 (u'UnFilmQuiMaMarqu\xe9', 1),
 (u'brfc', 1),
 (u'lufc', 1),
 (u'bcfc', 1),
 (u'Panthers', 1),
 (u'jobs', 1)]

We added a secend element of 1 to each hashtag so that we can count them in the "good old-fashion map-reduce" way.

In [12]:
%%time
tagcounts = hashtags.reduceByKey(lambda a, b: a+b)    # the first element of the tuple is the key

CPU times: user 10.8 ms, sys: 5.92 ms, total: 16.7 ms
Wall time: 45.8 ms


In [13]:
%%time
tagcounts.take(10)

CPU times: user 13.4 ms, sys: 7.34 ms, total: 20.8 ms
Wall time: 1min 33s


[(u'ImFeelingHim', 1),
 (u'Northindians', 1),
 (u'feelsorryforhim', 2),
 (u'Debts', 16),
 (u'Poetry', 1),
 (u'woods', 1),
 (u'hanging', 3),
 (u'areyoukiddingme', 2),
 (u'closecalls', 1),
 (u'EconomicCollapse', 1)]

Now, let's just see them in order. For that we need to swap elements in the tuples: the count result becomes the key. Then we can sort.

In [15]:
%%time
sorted_tagcounts = tagcounts.map(lambda (a,b): (b, a)).sortByKey(False)

CPU times: user 3.64 ms, sys: 0 ns, total: 3.64 ms
Wall time: 2.83 ms


In [16]:
%%time
sorted_tagcounts.take(10)

CPU times: user 8.09 ms, sys: 2.63 ms, total: 10.7 ms
Wall time: 251 ms


[(8854, u'Iran'),
 (6283, u'tcot'),
 (6197, u'ReligiousFreedom'),
 (6196, u'Nadarkhani'),
 (4890, u'Israel'),
 (3647, u'Obama'),
 (2436, u'ImACeleb'),
 (2392, u'EMAVoteOneDirection'),
 (2374, u'Gaza'),
 (2361, u'Pakistan')]

## Example: Loading CSV Files
Data files in CSV format can be treated in similar fashion to JSON files. One can either read them into RDDs and then create a table by splitting the text line into column values per row, or use the SparkSQL package to create Spark DataFrames.

First we need to create a SQLContext, the use that new object to read a CSV file.

In [30]:
sqlctx = SQLContext(sc)
import pandas as pd
import numpy as np

In [9]:
%%time
df_donorsummary = sqlctx.read.format('com.databricks.spark.csv')\
    .options(header=True, inferschema=False)\
    .load('/user/mgrace/red_cross/donor_summary912016.csv')

CPU times: user 2.4 ms, sys: 1.04 ms, total: 3.44 ms
Wall time: 80.2 ms


Spark can infer the type of each column to some degree, however, this may take significantly longer and often leads to crashes. Therefore, it's usually better to load the data as string values and then perform the data transformation on each column.

In [5]:
df_donorsummary.printSchema()

root
 |-- bzd_assessedhomevalue: string (nullable = true)
 |-- bzd_avg_bank_credit6: string (nullable = true)
 |-- bzd_avg_inq_all12: string (nullable = true)
 |-- bzd_avg_maxcred_install6: string (nullable = true)
 |-- bzd_avg_mos_autopay: string (nullable = true)
 |-- bzd_avg_numauto12: string (nullable = true)
 |-- bzd_descretionary_spend: string (nullable = true)
 |-- bzd_income: string (nullable = true)
 |-- bzd_lengthofresidence: string (nullable = true)
 |-- bzd_mortg_equity: string (nullable = true)
 |-- bzd_numberadultsinhh: string (nullable = true)
 |-- bzd_numberofchildrenhh: string (nullable = true)
 |-- bzd_realty_mortgremain: string (nullable = true)
 |-- bzd_realty_mospay: string (nullable = true)
 |-- bzd_tt_all_in_name: string (nullable = true)
 |-- bzd_tt_buy_am: string (nullable = true)
 |-- bzd_tt_go_flow: string (nullable = true)
 |-- bzd_tt_look_now: string (nullable = true)
 |-- bzd_tt_nsueh: string (nullable = true)
 |-- bzd_tt_penny: string (nullable = true)
 |

In [31]:
df_donorsummary.show(20)

+---------------------+--------------------+-----------------+------------------------+-------------------+-----------------+-----------------------+----------+---------------------+----------------+--------------------+----------------------+----------------------+-----------------+------------------+-------------+--------------+---------------+------------+------------+-----------------+-----------------+------------------+---------------------+-----------+-------------------+-------------------+--------------+---------------------+-------------------+---------------------+----------------+-------------------+--------------------+-------------------+----------------+----------------+-------------------+------------------+--------------------+--------------------+-------------------+---------------+------------------+------------------------+-------------------+---------------+-------------------+------------------+--------------+------------------+-------------------+----------------

In [55]:
re.match(r'^\s*NA\s*$', '  NA  ')

<_sre.SRE_Match at 0x3a716b0>

The following code should replace `'NA'` strings with `None` values to properly deal with NAs.

In [61]:
from pyspark.sql.functions import col, when
import re

def NA_as_null(x):
    return when(col(x).map(lambda v: re.match(r'^\s*NA\s*$', v) == None), col(x)).otherwise(None)

##newdf = sqlctx.createDataFrame()
for c in df_donorsummary.columns:
    dfna = df_donorsummary.withColumn(c, NA_as_null(c))

dfna.printSchema()

TypeError: 'Column' object is not callable

In [46]:
dfna.show(10)

+---------------------+--------------------+-----------------+------------------------+-------------------+-----------------+-----------------------+----------+---------------------+----------------+--------------------+----------------------+----------------------+-----------------+------------------+-------------+--------------+---------------+------------+------------+-----------------+-----------------+------------------+---------------------+-----------+-------------------+-------------------+--------------+---------------------+-------------------+---------------------+----------------+-------------------+--------------------+-------------------+----------------+----------------+-------------------+------------------+--------------------+--------------------+-------------------+---------------+------------------+------------------------+-------------------+---------------+-------------------+------------------+--------------+------------------+-------------------+----------------

## Example: Hive Database
An example to create a Hive DataFrame and save it into Hive.

### Best Practice:
Preprocess and clean your raw data files (in JSON or CSV) and save them for further use into a Hive database.

In [2]:
hctx = HiveContext(sc)

In [22]:
tpdf = hctx.createDataFrame(tagcounts)

In [23]:
tpdf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [45]:
tpdf.write.mode('overwrite').saveAsTable('elections2012_hashtags_ranked')

In [None]:
tagcntsdf = sqlctx.createDataFrame(tagcounts)
tagcntsdf.printSchema()

In [None]:
tagcntsdf.write.json("/user/molnar/data/election2012/top_hashtag_counts")

### Query Table

In [3]:
df = hctx.sql("select * from elections2012_hashtags_ranked order by `_2` desc limit 100")

In [5]:
pandas_df = df.toPandas()

In [6]:
type(pandas_df)

pandas.core.frame.DataFrame

In [7]:
pandas_df

Unnamed: 0,_1,_2
0,Iran,8854
1,tcot,6283
2,ReligiousFreedom,6197
3,Nadarkhani,6196
4,Israel,4890
5,Obama,3647
6,ImACeleb,2436
7,EMAVoteOneDirection,2392
8,Gaza,2374
9,Pakistan,2361
