# Spark

* consistent, composable APIs to build an application out of smaller libraries
* Language APIs make it possible to run Spark code using various programming languages (Scala, Python, SQL)
* manages and coordinates the excution of tasks on data across a cluster of computers
* driver and executor processes
* low-level "unstructured" APIs (RDDs) and higher-level structured APIs (DataFrames)
* tools and libraries: structured streaming, machine learning (MLib), etc.

## Initiate Spark Session

## Machine Learning in Spark

using DataFrame transformations

In [None]:
# read in data



In [None]:
staticDataFrame.printSchema()

In [None]:
# processing timestamp
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

### Train-Test Splits

In [None]:
# split the data manually along date (time series data)
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [None]:
# alternative: train validation splits (part IV)


In [None]:
# alternative: cross validation splits (part IV)


In [None]:
# back to processing timestamp
# turn days of the week into numerical
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

# use OneHotEncoder since it is categorical
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [None]:
# turn into vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [None]:
# processing strings

### Make a Pipeline

In [None]:
# make a pipeline
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [None]:
#two-step process
# first fit the transformers to this particular dataset
fittedPipeline = transformationPipeline.fit(trainDataFrame)

# then transform all of our data using the fitted pipeline
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [None]:
# cache transformed dataset for hyperparameter tuning
transformedTraining.cache()