### Running Production Applications
* turn your interactive exploration into production applications with spark-submit
* spark-submit lets you send your application code to a cluster and launch it to execute there.
* You can do this with all of Sparks support cluster managers 
#####Structured APIs - Datasets, DataFrames, SQL
#####Low-level APIs - RDDs, Distributed Variables

Running sample Python version application on local machine, by running an app in the directory where you downloaded the spark
```./bin/spark-submit --master local ./examples/src/main/python/pi.py 10```
By changing the master argument, you can submit the same app to Cluster manager, YARN or MESOS.

####Datasets:Type-Safe Structured APIs
* Dataset API is not available in Python and R - as they are dynamically typed
* DataFrames area adistributed collection of objects of type Row, that can hold various types of tabular data
* Dataset API gives users the ability to assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects, similar to a Java ArrayList or Scala Seq.

In [4]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/FileStore/tables/2010_12_01-ec65d.csv")
#.csv("/FileStore/tables/2015_summary-ebaee.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema


In [5]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)


In [6]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/FileStore/tables/*.csv")


In [7]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")


In [8]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()


In [9]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)


###Machine Learning and Advanced Analytics
* MLlib - allows for preprocessing, munging, training of models, and making predictions at scale on data
* Models trained in MLlib can be used to make predictions in Structured Streaming as well.
* MLlib algorithms require the data is represented as numerical values.

In [11]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)


In [12]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")


In [13]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


* StringIndexer - converts text to numerical values eg.Staurday to 6.
* OneHotEncoder - converts each of those numbers as their own column - to boolean values.
* All Machine learning algorithms in spark take input as a Vector type, which must be a set of numerical values.(VectorAssembler)

In [15]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")


In [16]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")


In [17]:
# set this process into a pipeline so that any future data we need to transform can go through the exact same process.
# preparing for training is a two step process 1. Fit 2. Transform
# Fit bcz StringIndexer needs to know how many unique values there are to be indexed.
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])


In [18]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)


In [19]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


In [20]:
# In Spark, training ML models is a two-phase process. First we initialize an untrained model(Algorithm - KMeans), and then train it (AlgotithmModel-kmModel)
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1L)


In [21]:
kmModel = kmeans.fit(transformedTraining)


In [22]:
transformedTest = fittedPipeline.transform(testDataFrame)


###Lower-Level APIs
* Spark include a number of lower-level primitives to allow for arbitrary Java and Python object manipulation via RDDs. Virtually everything in Spark is built on top of RDDs.
* DataFrames are compile down to these lower-level tools for convenient and extremely edfficient distributed execution.
* RDDs are lower level than DataFrames bcz they reveal physical execution characteristics (like partitions to end users)

In [24]:
# One thing you might use RDDs for is to parallelize raw data that you have stored in memory on the driver machine.
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
