# Configure

Introduction to spark configuration can be found, [here](https://spark.apache.org/docs/latest/configuration.html).

local[K,F] : Run Spark locally with K worker threads and F maxFailures (see spark.task.maxFailures for an explanation of this variable. Number of failures of any particular task before giving up on the job. The total number of failures spread across different tasks will not cause the job to fail.

local[*] : Run Spark locally with as many worker threads as logical cores on your machine.

In [72]:
# supporting functions
def fCtxSummary(context):
    print("SUMMARY",
      "\nSpark version: {}".format(context.version),
      "\nPython version: {}".format(context.pythonVer),
      "\nMaster url: {}".format(context.master)
     )
    print("\n")
    print("CONFIG",
      "\nWorker node path: {}".format( str(context.sparkHome) ), 
      "\nUser name: {}".format( str(context.sparkUser() )),
      "\nApp name: {}".format( str(context.appName )),
      "\nApp id: {}".format( context.applicationId ),
      "\nDefault parallelism: {}".format( context.defaultParallelism ),
      "\nMin rdd partitions: {}".format( context.defaultMinPartitions )
     )

<br>

### Spark Context

In [None]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
            .setMaster("local[*]")
            .setAppName("My app")
            .set("spark.executor.memory","1g"))
sc = SparkContext(conf = conf)

In [71]:
fCtxSummary(sc)

SUMMARY 
Spark version: 2.3.1 
Python version: 3.6 
Master url: local[*]


CONFIG 
Worker node path: None 
User name: jovyan 
App name: My app 
App id: local-1541700632731 
Default parallelism: 4 
Min rdd partitions: 2


In [4]:
! whereis spark

spark: /usr/local/spark


In [5]:
import os
os.chdir('/usr/local/spark')
#os.listdir()

In [11]:
%%timeit

# Test
from random import random
def sample(p):
    x,y = random(), random()
    return 1 if x*x + y*y <1 else 0

count = sc.parallelize(range(0,10000000)).map(sample).reduce(lambda a,b: a+b)
sc.master, 4.0 * count / 10000000

12.2 s ± 440 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


<br>
<br>

# Basics of Spark

In [73]:
# supporting functions
def fRddSummary(rdd):
    print("Partitions: {}".format( rdd.getNumPartitions() ),
      "\nNumber of records (file lines): {}".format(rdd.count())
     )

<br>

### Getting Data into RDDs

In [88]:
#New RDD, programmatically
# list of tuples
rdd_prgm_map = sc.parallelize([('a',2),('a',3),('d',1),('b',1)])

In [75]:
# sequence of values
rdd_prgm = sc.parallelize( range(0,10) )
rdd_prgm = sc.range(0, 10)

In [76]:
fRddSummary(rdd_prgm)

Partitions: 4 
Number of records (file lines): 10


In [86]:
rdd_prgm.sum()       # sum elements

45

In [90]:
rdd_prgm_map.collectAsMap()   #return as dictionary

{'a': 3, 'd': 1, 'b': 1}

In [78]:
# New RDD, typical way (combine text data across files)
rdd_files = sc.textFile("file:///usr/local/spark/licenses/")
  #files = sc.textFile("file:///licenses/*.txt")

In [79]:
fRddSummary(rdd_files)

Partitions: 38 
Number of records (file lines): 1131


In [51]:
# New RDD, k-v pairs of docname-text, not so common
rdd_pairs = sc.wholeTextFiles("file:///usr/local/spark/licenses/")

In [80]:
fRddSummary(rdd_pairs)

Partitions: 2 
Number of records (file lines): 38


In [63]:
# keys 
names = rdd_pairs.keys()
names.collect()[0:4]

['file:/usr/local/spark/licenses/LICENSE-netlib.txt',
 'file:/usr/local/spark/licenses/LICENSE-jpmml-model.txt',
 'file:/usr/local/spark/licenses/LICENSE-scalacheck.txt',
 'file:/usr/local/spark/licenses/LICENSE-d3.min.js.txt']

In [64]:
# values
filedata = rdd_pairs.values()
filedata.collect()[0][0:100]

'Copyright (c) 2013 Samuel Halliday\nCopyright (c) 1992-2011 The University of Tennessee and The Unive'

# Fundamentals of partitions

[how to determine when to reshuffle](https://stackoverflow.com/questions/33505050/how-to-know-when-to-repartition-coalesce-rdd-with-unbalanced-partitions-without?noredirect=1&lq=1)

In [170]:
rdd_files.getNumPartitions()

38

In [174]:
# performs full shuffle to provide evenly divided data, which is needed after filtering out large amounts
new_rdd = rdd_files.repartition(40)    # new RDD with N partitions.  only use for increasing
new_rdd.getNumPartitions()

40

In [169]:
new_rdd.unpersist()                   # remove RDD

MapPartitionsRDD[143] at coalesce at NativeMethodAccessorImpl.java:0

In [173]:
new_rdd = rdd_files.coalesce(2)       # decrease the number of partitions in the RDD to N, avoids full reshuffle
new_rdd.getNumPartitions()

2

[different storage configurations](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.StorageLevel)

In [166]:
rdd_files.getStorageLevel()

StorageLevel(False, False, False, False, 1)

[get partition sizes](https://stackoverflow.com/questions/41068112/spark-find-each-partition-size-for-rdd?noredirect=1&lq=1)

In [175]:
l = rdd.glom().map(len).collect()  # get length of each partition
print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ', sum(l)/len(l),'. Total Partitions: ', len(l))

Min Parition Size:  2 . Max Parition Size:  3 . Avg Parition Size:  2.5 . Total Partitions:  4


In [177]:
l = rdd.mapPartitions(lambda it: [sum(1 for _ in it)])
print(l)

PythonRDD[154] at RDD at PythonRDD.scala:49


[to repartition by a range(index)](https://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-dataframe?rq=1)

In [None]:
val partitionedByRange = df.repartitionByRange(42, $"k")

# RDD Operations

In [150]:
rdd_prgm.count()

10

In [159]:
rdd_prgm.sum()
rdd_prgm.max()
rdd_prgm.mean()
rdd_prgm.stdev()
rdd_prgm.stats()

(count: 10, mean: 4.5, stdev: 2.87228132327, max: 9.0, min: 0.0)

In [160]:
rdd_prgm.histogram(3)

([0, 3, 6, 9], [3, 3, 4])

__Selecting data__

In [179]:
rdd_prgm.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [182]:
rdd_prgm.take(5)
rdd_prgm.top(3)

[9, 8, 7]

In [183]:
rdd_files.first()

'Copyright (c) 2013 Samuel Halliday'

In [189]:
rdd_prgm.sample(False, 0.15, 81).collect()

[]

In [188]:
rdd_prgm.filter(lambda x: x%2).collect()          

[1, 3, 5, 7, 9]

In [190]:
rdd_prgm_map.filter(lambda x: 'a' in x).collect()    

[('a', 2), ('a', 3)]

In [191]:
rdd_prgm_map.distinct().collect()   

[('a', 2), ('b', 1), ('a', 3), ('d', 1)]

__Applying Function__

In [213]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])

In [224]:
rdd.sortBy(lambda x: x[1]).collect()

[('a', 2), ('b', 2), ('a', 7)]

In [225]:
rdd.sortByKey().collect()

[('a', 7), ('a', 2), ('b', 2)]

In [219]:
def g(x): print('hi!')
rdd.foreach(g)

In [216]:
rdd.map(lambda x: x+(x[1],x[0])).collect()        # apply a function to each RDD element

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]

In [209]:
rdd.map(lambda x: (x[1],x[0]) ).collect()          

[(7, 'a'), (2, 'a'), (2, 'b')]

In [211]:
rdd.flatMap(lambda x: (x[1],x[0]) ).collect()            # flatten result

[7, 'a', 2, 'a', 2, 'b']

In [212]:
rdd = sc.parallelize([("a",["x","y","z"]),("b",["p", "r"])])
rdd4.flatMapValues(lambda x: x).collect()   # apply a flatMap function to each (key,value) without changing keys                       

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

__Reshaping Data__

In [226]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])

In [227]:
rdd.reduceByKey(lambda x,y : x+y).collect()      # sum values across keys

[('b', 2), ('a', 9)]

In [229]:
rdd.reduce(lambda k, v: k+v)                     # merge keys and values

('a', 7, 'a', 2, 'b', 2)

In [236]:
rdd = sc.parallelize(range(5))

In [237]:
rdd.groupBy(lambda x: x%2).mapValues(list).collect()

[(0, [0, 2, 4]), (1, [1, 3])]

In [239]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd.groupByKey().mapValues(list).collect()

[('b', [2]), ('a', [7, 2])]

[fold](https://stackoverflow.com/questions/29150202/pyspark-fold-method-output)

In [252]:
rdd = sc.parallelize(range(5))
rdd.fold(0, lambda a,b:a+b)

10

In [248]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd.foldByKey(0, lambda a,b:a+b).collect()

[('b', 2), ('a', 9)]

In [257]:
rdd = sc.parallelize(range(5))
rdd.keyBy(lambda x: str(x)).collect()                # create tuples of RDD elements

[('0', 0), ('1', 1), ('2', 2), ('3', 3), ('4', 4)]

In [258]:
rdd = sc.parallelize(range(5))
seqOp = (lambda x,y: (x[0]+y,x[1]+1))
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOp)    # Aggregate RDD elements of each partition and then the results

(10, 5)

### Example: Word Count

In [138]:
import re

In [139]:
rdd_files.count()

1131

In [141]:
flattend = files.filter(lambda line: len(line)>0 ) \
                .flatMap(lambda line: re.split('\W+', line))

In [142]:
flattend.take(6)

['Copyright', 'c', '2013', 'Samuel', 'Halliday', 'Copyright']

In [143]:
kvpairs = flattend.filter(lambda word: len(word)>0) \
                  .map(lambda word:(word.lower(),1))

In [144]:
kvpairs.take(6)

[('copyright', 1),
 ('c', 1),
 ('2013', 1),
 ('samuel', 1),
 ('halliday', 1),
 ('copyright', 1)]

In [145]:
countsbyword = kvpairs.reduceByKey(lambda v1, v2: v1+v2)  \
                      .sortByKey(ascending=False)

In [146]:
countsbyword.take(6)

[('zstd', 2),
 ('zstandard', 1),
 ('zope', 3),
 ('zeiger', 1),
 ('your', 1),
 ('you', 1)]

In [147]:
topwords = countsbyword.map(lambda wc: (wc[1],wc[0]) )  \
                       .sortByKey(ascending=False)

In [148]:
topwords.take(5)

[(612, 'the'), (476, 'of'), (445, 'or'), (329, 'and'), (257, 'in')]

In [290]:
topwords.saveAsTextFile("/usr/local/spark-2.1.0-bin-hadoop2.7/data/wordcounts.txt")

# SQLContext

[ref](http://nbviewer.jupyter.org/github/tfolkman/learningwithdata/blob/master/Getting_Started_With_Spark_DataFrame.ipynb)

In [5]:
import urllib
import json

In [7]:
import requests

In [9]:
req = requests.get('https://data.ny.gov/resource/unag-2p27.json?$limit=50000')

In [14]:
content = json.loads(req.text)
with open("../data/ny_salaries.json", "w") as f:
    json.dump(content, f)

In [6]:
! ls ../data

CompTot-Env.rda  ny_salaries.json  prjCCppJavaCshVb.db


In [1]:
from pyspark import SparkContext
sc = SparkContext("local")

In [3]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read.json("../data/ny_salaries.json")

In [7]:
df.count()

50000

In [5]:
df.printSchema()

root
 |-- actual_salary_paid: string (nullable = true)
 |-- authority_name: string (nullable = true)
 |-- base_annualized_salary: string (nullable = true)
 |-- department: string (nullable = true)
 |-- exempt_indicator: string (nullable = true)
 |-- extra_pay: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- fiscal_year_end_date: string (nullable = true)
 |-- group: string (nullable = true)
 |-- has_employees: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- middle_initial: string (nullable = true)
 |-- other_compensation: string (nullable = true)
 |-- overtime_paid: string (nullable = true)
 |-- paid_by_another_entity: string (nullable = true)
 |-- paid_by_state_or_local_government: string (nullable = true)
 |-- pay_type: string (nullable = true)
 |-- performance_bonus: string (nullable = true)
 |-- title: string (nullable = true)
 |-- total_compensation: string (nullable = true)



In [9]:
df.select('actual_salary_paid').show(3)

+------------------+
|actual_salary_paid|
+------------------+
|          28725.55|
|          31973.09|
|          68654.00|
+------------------+
only showing top 3 rows



In [10]:
df.filter(df["total_compensation"] > 100000).show(3)

+------------------+--------------------+----------------------+----------+----------------+---------+----------+--------------------+----------+-------------+---------+--------------+------------------+-------------+----------------------+---------------------------------+--------+-----------------+------------------+------------------+
|actual_salary_paid|      authority_name|base_annualized_salary|department|exempt_indicator|extra_pay|first_name|fiscal_year_end_date|     group|has_employees|last_name|middle_initial|other_compensation|overtime_paid|paid_by_another_entity|paid_by_state_or_local_government|pay_type|performance_bonus|             title|total_compensation|
+------------------+--------------------+----------------------+----------+----------------+---------+----------+--------------------+----------+-------------+---------+--------------+------------------+-------------+----------------------+---------------------------------+--------+-----------------+------------------+

In [11]:
from pyspark.sql.functions import desc
df_cast = df.withColumn("total_compensation_float", df.total_compensation.astype("float"))

In [18]:
type(df_cast)

pyspark.sql.dataframe.DataFrame

In [38]:
list(reversed(df_cast.columns))[0:5]

['total_compensation_float',
 'total_compensation',
 'title',
 'performance_bonus',
 'pay_type']

In [45]:
top10 = (df_cast
         .groupBy("department")
         .mean("total_compensation_float")
         .orderBy(desc("AVG(total_compensation_float)"))
         .take(10))

In [46]:
top10

[Row(department='8705 Bariatrics', avg(total_compensation_float)=332580.46875),
 Row(department='BARIATRICS', avg(total_compensation_float)=285044.1979166667),
 Row(department='DR. SPERRY', avg(total_compensation_float)=279918.5),
 Row(department='8720 Clinical Medicine', avg(total_compensation_float)=262896.0859375),
 Row(department='8715 Oral Oncology Maxolfacial', avg(total_compensation_float)=228185.66666666666),
 Row(department='ORAL ONCOLOGY & MAXOFACIAL', avg(total_compensation_float)=216544.56640625),
 Row(department='PHYSICIANS - CARDIOL', avg(total_compensation_float)=214361.0),
 Row(department='OFFICE OF THE SR. VI', avg(total_compensation_float)=214259.40625),
 Row(department='Office of the Chief Executive', avg(total_compensation_float)=212708.86979166666),
 Row(department='CL-CONNECTIVE TIS.', avg(total_compensation_float)=210000.0)]

# Tunning Spark

references:
* [one](http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/)
* [two](http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/)
* [shuffle](http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/)

# Fundamental Exercises

[ref](https://github.com/jadianes/spark-py-notebooks)
[ref](https://github.com/MingChen0919/learning-apache-spark)