# Starting Point: SparkSession
The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder():

In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL instacart example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

spark = org.apache.spark.sql.SparkSession@434f30ef


org.apache.spark.sql.SparkSession@434f30ef

In [3]:
spark.version

2.4.4

# Creating DataFrames

In [5]:
val aisles = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("aisles.csv")
val departments = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("departments.csv")
val order_products_prior = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("order_products__prior.csv")
val order_products_train = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("order_products__train.csv")
val orders = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("orders.csv")
val products= spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("products.csv")

aisles = [aisle_id: int, aisle: string]
departments = [department_id: int, department: string]
order_products_prior = [order_id: int, product_id: int ... 2 more fields]
order_products_train = [order_id: int, product_id: int ... 2 more fields]
orders = [order_id: int, user_id: int ... 5 more fields]
products = [product_id: int, product_name: string ... 2 more fields]


[product_id: int, product_name: string ... 2 more fields]

# Running SQL Queries Programmatically
The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

In [17]:
// Register the DataFrame as a global temporary view
// Global temporary view is tied to a system preserved database `global_temp`
// Global temporary view is cross-session
aisles.createOrReplaceTempView("aisles")
departments.createOrReplaceTempView("departments")
order_products_prior.createOrReplaceTempView("order_products_prior")
order_products_train.createOrReplaceTempView("order_products_train")
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")

In [18]:
// Organize the data by shopping basket
import org.apache.spark.sql.functions.{collect_set,col,count}
val rawData = spark.sql("""
select p.product_name, o.order_id 
from products p 
inner join order_products_train o 
where o.product_id = p.product_id""")
val baskets = rawData.groupBy("order_id").agg(collect_set("product_name").alias("items"))
baskets.createOrReplaceTempView("baskets")
baskets.show(3)

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|    1342|[Raw Shrimp, Seed...|
|    1591|[Cracked Wheat, S...|
|    4519|[Beet Apple Carro...|
+--------+--------------------+
only showing top 3 rows


rawData = [product_name: string, order_id: int]
baskets = [order_id: int, items: array<string>]


[order_id: int, items: array<string>]

# Train ML Model
To understand the frequency of items are associated with each other (e.g. peanut butter and jelly), we will use association rule mining for market basket analysis. [Spark MLlib](http://spark.apache.org/docs/latest/mllib-guide.html) implements two algorithms related to frequency pattern mining (FPM): [FP-growth](https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-growth) and [PrefixSpan](https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#prefixspan). The distinction is that FP-growth does not use order information in the itemsets, if any, while PrefixSpan is designed for sequential pattern mining where the itemsets are ordered. We will use FP-growth as the order information is not important for this use case.

Note, we will be using the ```Scala API``` so we can configure ```setMinConfidence```.

## Use FP-growth

In [21]:
// Extract out the items 
val baskets_ds = spark.sql("select items from baskets").as[Array[String]].toDF("items")

baskets_ds = [items: array<string>]


[items: array<string>]

In [22]:
import org.apache.spark.ml.fpm.FPGrowth
// Use FPGrowth
val fpgrowth = new FPGrowth().setItemsCol("items").setMinSupport(0.001).setMinConfidence(0)
val model = fpgrowth.fit(baskets_ds)

fpgrowth = fpgrowth_aa0bdac30b6d
model = fpgrowth_aa0bdac30b6d


fpgrowth_aa0bdac30b6d

## Most Frequent Itemsets

In [23]:
// Display frequent itemsets
val mostPopularItemInABasket = model.freqItemsets
mostPopularItemInABasket.createOrReplaceTempView("mostPopularItemInABasket")

mostPopularItemInABasket = [items: array<string>, freq: bigint]


[items: array<string>, freq: bigint]

In [25]:
val sqlDF = spark.sql("select items, freq from mostPopularItemInABasket where size(items) > 2 order by freq desc limit 20")
sqlDF.show()

+--------------------+----+
|               items|freq|
+--------------------+----+
|[Organic Hass Avo...| 710|
|[Organic Raspberr...| 649|
|[Organic Baby Spi...| 587|
|[Organic Raspberr...| 531|
|[Organic Hass Avo...| 497|
|[Organic Avocado,...| 484|
|[Organic Avocado,...| 477|
|[Limes, Large Lem...| 452|
|[Organic Cucumber...| 424|
|[Limes, Organic A...| 389|
|[Organic Raspberr...| 381|
|[Organic Avocado,...| 379|
|[Organic Baby Spi...| 376|
|[Organic Blueberr...| 374|
|[Large Lemon, Org...| 371|
|[Organic Cucumber...| 366|
|[Organic Lemon, O...| 353|
|[Limes, Organic A...| 352|
|[Organic Whole Mi...| 339|
|[Organic Avocado,...| 334|
+--------------------+----+



sqlDF = [items: array<string>, freq: bigint]


[items: array<string>, freq: bigint]

# Review Association Rules
In addition to ```freqItemSets```, the ```FP-growth model``` also generates ```association rules```. For example, if a shopper purchases peanut butter , what is the likelihood that they will also purchase jelly. For more information, a good reference is Susan Li's [A Gentle Introduction on Market Basket Analysis — Association Rules](https://towardsdatascience.com/a-gentle-introduction-on-market-basket-analysis-association-rules-fa4b986a40ce)

## View Generated Association Rules

In [26]:
// Display generated association rules.
val ifThen = model.associationRules
ifThen.createOrReplaceTempView("ifThen")

ifThen = [antecedent: array<string>, consequent: array<string> ... 2 more fields]


[antecedent: array<string>, consequent: array<string> ... 2 more fields]

In [33]:
val sqlDF = spark.sql("select antecedent as `antecedent (if)`, consequent as `consequent (then)`, confidence from ifThen order by confidence desc limit 20")
sqlDF.show()

+--------------------+--------------------+-------------------+
|     antecedent (if)|   consequent (then)|         confidence|
+--------------------+--------------------+-------------------+
|[Organic Raspberr...|[Bag of Organic B...| 0.5984251968503937|
|[Organic Cucumber...|[Bag of Organic B...|           0.546875|
|[Organic Kiwi, Or...|[Bag of Organic B...| 0.5459770114942529|
|[Organic Navel Or...|[Bag of Organic B...| 0.5412186379928315|
|[Yellow Onions, S...|            [Banana]| 0.5357142857142857|
|[Organic Whole St...|[Bag of Organic B...| 0.5314685314685315|
|[Organic Navel Or...|[Bag of Organic B...| 0.5283018867924528|
|[Organic Raspberr...|[Bag of Organic B...|  0.521099116781158|
|[Organic D'Anjou ...|[Bag of Organic B...| 0.5170454545454546|
|[Organic Unsweete...|[Bag of Organic B...| 0.5141065830721003|
|[Organic Broccoli...|[Bag of Organic B...| 0.5048231511254019|
|[Organic Lemon, O...|[Bag of Organic B...| 0.4989106753812636|
|[Organic Hass Avo...|[Bag of Organic B.

sqlDF = [antecedent (if): array<string>, consequent (then): array<string> ... 1 more field]


[antecedent (if): array<string>, consequent (then): array<string> ... 1 more field]

In [34]:
sqlDF.write.format("csv").mode("overwrite").option("sep",",").save("ar.csv")

Name: org.apache.spark.sql.AnalysisException
Message: CSV data source does not support array<string> data type.;
StackTrace:   at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:34)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
  at org.apache.spark.sql.execution.datasourc