## Figuring out Spark 2.0

aka the *Decimal Project*

### Notes/Discoveries

* dates are a real headache

### References / Read

* http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/
* https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
* http://blog.cloudera.com/blog/2015/07/how-to-do-data-quality-checks-using-apache-spark-dataframes/
* https://www.infoq.com/articles/apache-spark-sql
* http://blog.brakmic.com/data-science-for-losers-part-5-spark-dataframes/


In [1]:
##### CELL A #####

# Basic setup required in all notebooks
 
appName='archetest'
master='local[*]' #local spark-master

# Explicitly define python 2 since we have both 2 & 3 installed
import os
os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python'

# Establish spark context or session
#from pyspark import SparkContext, SparkConf
#conf = SparkConf().setAppName(appName).setMaster(master)
#sc = SparkContext(conf=conf)


# A SparkSession can be created using a builder pattern
from pyspark.sql import SparkSession
spark = (SparkSession
         .builder
         .master(master)
         .appName(appName)
         .getOrCreate())

In [2]:
##### CELL A-2 #####

# do something to prove it works

# Old way
#rdd = sc.parallelize(range(1000))
#rdd.takeSample(False, 5)

# Spark 2
# ok- rdd's are still available, but the spark dataframe/set is now the preferred data structure, 
# albeit a bit more complex.  For example, you don't create a dataframe or rdd from a random  
# list, you create a range that is ALREADY a dataframe.

r = spark.range(10) #creates a dataframe/dataset
print type(r),r
r.sample(False,.5)
r.take(5)

<class 'pyspark.sql.dataframe.DataFrame'> DataFrame[id: bigint]


[Row(id=0), Row(id=1), Row(id=2), Row(id=3), Row(id=4)]

In [3]:
# Somewhat Handy methods

def prSomeDS(inputDS,howMany,msg=None):
    """
    Debugging shorthand

    Args: A Spark 2 DataSet, how many items to print, and optional message

    Returns: None
        
    """
    print msg
    #inputDS.take(howMany).foreach(println)  #doesn't work with all spark2 datasets
    for x in inputDS.take(howMany):
        print x    


In [4]:
#stop here  # halt execution with an error

## File Access

In [5]:
##### CELL READ A #####
# read local file into a dataset

datafile_local = "/home/jovyan/work/fotd.csv"

#dataRDD = sc.textFile(datafile_local)  #old way

dataDS = spark.read.text(datafile_local) # spark2
#spark2 type is <class 'pyspark.sql.dataframe.DataFrame'>
print type(dataDS)
dataDS.take(2)

<class 'pyspark.sql.dataframe.DataFrame'>


[Row(value=u'sheet,Date,Event,Lesson,attendees_reg,attendees_student,attendees_special,attendees_board,addendees_free,addendees_coupon,addendees_comp,fee_reg,fee_student,fee_special,fee_board,donations,caller,band'),
 Row(value=u'4/16/2016,4/16/2016,Contra Dance Financial Worksheet,-1,37,11,0,4,6,0,6,$9.00,$5.00,$8.00,$9.00,$37.00,Patricia Dancen,Steam')]

In [6]:
##### CELL READ B #####

# Define the datafile and establish HDFS access, spark 2

datafile_HDFS = r'hdfs://172.18.0.2:9000/user/root/testData/fotd.csv'
d = spark.read.text(datafile_HDFS)
print type(d)
prSomeDS(d,2,'fotd sample from hdfs store')


<class 'pyspark.sql.dataframe.DataFrame'>
fotd sample from hdfs store
Row(value=u'sheet,Date,Event,Lesson,attendees_reg,attendees_student,attendees_special,attendees_board,addendees_free,addendees_coupon,addendees_comp,fee_reg,fee_student,fee_special,fee_board,donations,caller,band')
Row(value=u'4/16/2016,4/16/2016,Contra Dance Financial Worksheet,-1,37,11,0,4,6,0,6,$9.00,$5.00,$8.00,$9.00,$37.00,Patricia Dancen,Steam')


In [7]:
##### CELL READ  #####
# Read as default csv file

raw_data2 = spark.read.csv(datafile_HDFS, header = "true")
print type(raw_data2)
raw_data2.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- sheet: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Lesson: string (nullable = true)
 |-- attendees_reg: string (nullable = true)
 |-- attendees_student: string (nullable = true)
 |-- attendees_special: string (nullable = true)
 |-- attendees_board: string (nullable = true)
 |-- addendees_free: string (nullable = true)
 |-- addendees_coupon: string (nullable = true)
 |-- addendees_comp: string (nullable = true)
 |-- fee_reg: string (nullable = true)
 |-- fee_student: string (nullable = true)
 |-- fee_special: string (nullable = true)
 |-- fee_board: string (nullable = true)
 |-- donations: string (nullable = true)
 |-- caller: string (nullable = true)
 |-- band: string (nullable = true)



In [None]:
##### CELL READ  #####
# Try reading with a pre-defined schema

from pyspark.sql.types import *

In [52]:
##### CELL READ  #####
# Open a simple file with pre-defined schema

datafile_sample1_HDFS = r'hdfs://172.18.0.2:9000/user/root/testData/simpleSampleClean.csv'
schema = StructType([StructField("line", StringType(), False),
                     StructField("aStr", StringType(), False),
                     StructField("aDate", DateType(), False),
                     StructField("anInt", IntegerType(), False),
                     StructField("aFloat", DoubleType(), False),
                     StructField("aNote", StringType(), False)
                     ])  
raw_sample1 = spark.read.csv(datafile_sample1_HDFS,
                             schema = schema,
                             mode = 'PERMISSIVE',
                             sep = ',',
                             comment='#'
                            )
print type(raw_sample1)
raw_sample1.printSchema()

raw_sample1.show()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- line: string (nullable = true)
 |-- aStr: string (nullable = true)
 |-- aDate: date (nullable = true)
 |-- anInt: integer (nullable = true)
 |-- aFloat: double (nullable = true)
 |-- aNote: string (nullable = true)



Py4JJavaError: An error occurred while calling o552.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 28, localhost): java.lang.IllegalArgumentException
	at java.sql.Date.valueOf(Date.java:140)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:291)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:128)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException
	at java.sql.Date.valueOf(Date.java:140)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:291)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:128)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [41]:

# sheet,Date,Event,Lesson,
# attendees_reg,attendees_student,attendees_special,attendees_board,addendees_free,addendees_coupon,addendees_comp,
# fee_reg,fee_student,fee_special,fee_board,donations,caller,band
schema = StructType([ \
                     StructField("sheet"      , StringType(), False),
                     StructField("event"      , StringType(), False),
                     StructField("event_date" , StringType(), False),
                     StructField("lesson"     , IntegerType(), False),
                     StructField("num_reg"    , IntegerType(), False),
                     StructField("num_student", IntegerType(), False),
                     StructField("num_special", IntegerType(), False),
                     StructField("num_board"  , IntegerType(), False),
                     StructField("num_free"   , IntegerType(), False),
                     StructField("num_coupon" , IntegerType(), False),
                     StructField("num_comp"   , IntegerType(), False),
                     StructField("fee_reg"    , DecimalType(4,2), False),
                     StructField("fee_student", DecimalType(4,2), False),
                     StructField("fee_special", DecimalType(4,2), False),
                     StructField("fee_board"  , DecimalType(4,2), False),
                     StructField("donations"  , DecimalType(10,2), False),
                     StructField("caller"     , StringType(), False),
                     StructField("band"       , StringType(), False)
                    ])
         


raw_data3 = spark.read.csv(datafile_HDFS, 
                           schema = schema,
                           mode = 'PERMISSIVE',
                           sep = ',',
                           comment='S',  #skips header row starting with "Sheet",

                           )
print type(raw_data3)
raw_data3.printSchema()

raw_data3.show(1)

## Data Conversion
Get Data into a useable format

In [8]:
##### Cell Data Conversion-A #####

from pyspark.sql.functions import regexp_extract, regexp_replace, trim, col, lower, to_date

# extraction patterns
re_us_currency = r'(\d*\.\d\d)'
re_integer = r'(\d*)'
re_float = r'(\d*\.?\d*)'

# try user defined function to customize parsing
#spark.udf.register("addone",(x:Int)=>x+1)

In [9]:
##### Cell Data Conversion-B #####

# Transform into new dataset/dataframe with appropriate data types
dataDS2 = raw_data2.select(to_date(regexp_replace(raw_data2.Date,r'/',r'-')).alias('newdate'),
                   lower(raw_data2.Event).alias('event'),
                   lower(raw_data2.caller).alias('caller'),
                   lower(raw_data2.band).alias('band'),
                   regexp_extract(raw_data2.Lesson,re_integer,1).alias('inlesson').cast("int"), 
                   regexp_extract(raw_data2.attendees_reg,re_integer,1).alias('attendees_reg').cast("int"), 
                   regexp_extract(raw_data2.attendees_student,re_integer,1).alias('attendees_student').cast("int"), 
                   regexp_extract(raw_data2.attendees_special,re_integer,1).alias('attendees_special').cast("int"), 
                   regexp_extract(raw_data2.attendees_board,re_integer,1).alias('attendees_board').cast("int"), 
                   regexp_extract(raw_data2.addendees_coupon,re_integer,1).alias('addendees_coupon').cast("int"), 
                   regexp_extract(raw_data2.addendees_coupon,re_integer,1).alias('addendees_coupon').cast("int"), 
                   regexp_extract(raw_data2.fee_reg,re_us_currency,1).alias("fee_reg").cast("decimal(10,2)"),
                   regexp_extract(raw_data2.fee_student,re_us_currency,1).alias("fee_student").cast("decimal(10,2)"),
                   regexp_extract(raw_data2.fee_special,re_us_currency,1).alias("fee_special").cast("decimal(10,2)"),
                   regexp_extract(raw_data2.fee_board,re_us_currency,1).alias("fee_board").cast("decimal(10,2)"),
                   regexp_extract(raw_data2.donations,re_us_currency,1).alias("donations").cast("decimal(10,2)")
                  )
              
dataDS2.printSchema()
#prSomeDS(dataDS2,3,"transformed data")
dataDS2.show(3)


root
 |-- newdate: date (nullable = true)
 |-- event: string (nullable = true)
 |-- caller: string (nullable = true)
 |-- band: string (nullable = true)
 |-- inlesson: integer (nullable = true)
 |-- attendees_reg: integer (nullable = true)
 |-- attendees_student: integer (nullable = true)
 |-- attendees_special: integer (nullable = true)
 |-- attendees_board: integer (nullable = true)
 |-- addendees_coupon: integer (nullable = true)
 |-- addendees_coupon: integer (nullable = true)
 |-- fee_reg: decimal(10,2) (nullable = true)
 |-- fee_student: decimal(10,2) (nullable = true)
 |-- fee_special: decimal(10,2) (nullable = true)
 |-- fee_board: decimal(10,2) (nullable = true)
 |-- donations: decimal(10,2) (nullable = true)

+-------+--------------------+---------------+--------------------+--------+-------------+-----------------+-----------------+---------------+----------------+----------------+-------+-----------+-----------+---------+---------+
|newdate|               event|         cal

In [10]:
##### Cell Data Conversion-C #####

# validation check: there should be no null values



In [11]:
# what CAN I do with this spark session object?
dir(spark)

['Builder',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_conf',
 '_createFromLocal',
 '_createFromRDD',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedContext',
 '_jsc',
 '_jsparkSession',
 '_jvm',
 '_jwrapped',
 '_sc',
 '_wrapped',
 'builder',
 'catalog',
 'conf',
 'createDataFrame',
 'newSession',
 'range',
 'read',
 'readStream',
 'sparkContext',
 'sql',
 'stop',
 'streams',
 'table',
 'udf',
 'version']

In [44]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, disable
    ``inferSchema`` option or specify the schema explicitly using ``schema``.
    
    :param path: string, or list of strings, for input path(s).
    :param schema: an optional :class:`StructType` for the input schema.
    :param sep: sets the single character as a separator for each field