# Apache Spark Intro

**Help Preparing the pyspark env:** https://github.com/boyander/runpyspark

**Important**: You have to run this notebook using `pyspark` command.

1. Create the `SparkSession`
2. Open spark dashboard in  ["http://localhost:4040"]("http://localhost:4040") 

https://spark.apache.org/docs/latest/monitoring.html

Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:

A list of scheduler stages and tasks
A summary of RDD sizes and memory usage
Environmental information.
Information about the running executors
You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

In [1]:
# Prepare a SparkContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
# Open Spark Dashboard in browser tab
import webbrowser
webbrowser.open("http://localhost:4040")

True

# Estimate PI 

In [3]:
# https://www.geogebra.org/m/cF7RwK3H
import random

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 10000000
count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly {}".format(4.0 * count / NUM_SAMPLES))

Pi is roughly 3.1413392


# Some Spark commands

In [5]:
# Loading a CSV
df = spark.read.format("csv").option("header", "true").load("../data/breadbasket_dms.csv")
df.show()

+----------+--------+-----------+-------------+
|      Date|    Time|Transaction|         Item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
|2016-10-30|10:16:55|          6|    Medialuna|
|2016-10-30|10:16:55|          6|       Pastry|
|2016-10-30|10:16:55|          6|       Muffin|
|2016-10-30|10:19:12|          7|    Medialuna|
|2016-10-30|10:19:12|          7|       Pastry|
|2016-10-30|10:19:12|          7|       Coffee|
|2016-10-30|10:19:12|          7|          Tea|
|2016-10-30|10:20:51|          8|       

In [6]:
%%time
# Grouping operation with distinct count
from pyspark.sql.functions import collect_list, approx_count_distinct

q = df.groupby(df.Transaction).agg(collect_list("Item"), approx_count_distinct("Item"))
print(q.show())
print(q.head())

+-----------+--------------------+---------------------------+
|Transaction|  collect_list(Item)|approx_count_distinct(Item)|
+-----------+--------------------+---------------------------+
|       1090|[Brownie, Coffee,...|                          4|
|       1159|             [Bread]|                          1|
|       1436|      [Coffee, Soup]|                          2|
|       1512|[Hearty & Seasona...|                          3|
|       1572|    [Pastry, Coffee]|                          2|
|       2069|             [Bread]|                          1|
|       2088|      [Scandinavian]|                          1|
|       2136|[Hot chocolate, Tea]|                          2|
|       2162|[Coffee, Tea, Jui...|                          7|
|       2294|         [Tea, NONE]|                          2|
|       2904|[NONE, Sandwich, ...|                          4|
|        296|[Farm House, Scan...|                          2|
|       3210|      [Bread, Bread]|                     

In [7]:
from pyspark.sql import functions as F
df_pivoted = df.groupBy("Transaction").pivot("Item").agg(F.lit(1)).na.fill(0)
df_pivoted.select(df_pivoted.columns[:7]).show()

+-----------+----------+------------------------+---------+---------------+--------+-----+
|Transaction|Adjustment|Afternoon with the baker|Alfajores|Argentina Night|Art Tray|Bacon|
+-----------+----------+------------------------+---------+---------------+--------+-----+
|       4821|         0|                       0|        0|              0|       0|    0|
|       9030|         0|                       0|        0|              0|       0|    0|
|        296|         0|                       0|        0|              0|       0|    0|
|       2904|         0|                       0|        0|              0|       0|    0|
|       1572|         0|                       0|        0|              0|       0|    0|
|       6613|         0|                       0|        0|              0|       0|    0|
|       6194|         0|                       0|        1|              0|       0|    0|
|       7273|         0|                       0|        0|              0|       0|    0|

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType

df_pivoted = df_pivoted.withColumn("Transaction", df_pivoted["Transaction"].cast(IntegerType()))
print(df_pivoted.columns[:])
print(len(df_pivoted.columns))

vecAssembler = VectorAssembler(inputCols=df_pivoted.columns[3:], outputCol="Features")
new_df = vecAssembler.transform(df_pivoted)
X = new_df.select('Features')
X.head(5)

['Transaction', 'Adjustment', 'Afternoon with the baker', 'Alfajores', 'Argentina Night', 'Art Tray', 'Bacon', 'Baguette', 'Bakewell', 'Bare Popcorn', 'Basket', 'Bowl Nic Pitt', 'Bread', 'Bread Pudding', 'Brioche and salami', 'Brownie', 'Cake', 'Caramel bites', 'Cherry me Dried fruit', 'Chicken Stew', 'Chicken sand', 'Chimichurri Oil', 'Chocolates', 'Christmas common', 'Coffee', 'Coffee granules ', 'Coke', 'Cookies', 'Crepes', 'Crisps', 'Drinking chocolate spoons ', 'Duck egg', 'Dulce de Leche', 'Eggs', "Ella's Kitchen Pouches", 'Empanadas', 'Extra Salami or Feta', 'Fairy Doors', 'Farm House', 'Focaccia', 'Frittata', 'Fudge', 'Gift voucher', 'Gingerbread syrup', 'Granola', 'Hack the stack', 'Half slice Monster ', 'Hearty & Seasonal', 'Honey', 'Hot chocolate', 'Jam', 'Jammie Dodgers', 'Juice', 'Keeping It Local', 'Kids biscuit', 'Lemon and coconut', 'Medialuna', 'Mighty Protein', 'Mineral water', 'Mortimer', 'Muesli', 'Muffin', 'My-5 Fruit Shoot', 'NONE', 'Nomad bag', 'Olum & polenta', 

[Row(Features=SparseVector(93, {9: 1.0, 72: 1.0})),
 Row(Features=SparseVector(93, {21: 1.0, 48: 1.0})),
 Row(Features=SparseVector(93, {35: 1.0, 73: 1.0})),
 Row(Features=SparseVector(93, {21: 1.0, 60: 1.0, 72: 1.0, 82: 1.0})),
 Row(Features=SparseVector(93, {21: 1.0, 64: 1.0}))]

In [9]:
from pyspark.mllib.clustering import KMeans

num_clusters = 4
data = X.rdd.map(lambda x: x[0].toArray()) 
clusters = KMeans.train(data, num_clusters, maxIterations=15, initializationMode="random")


In [10]:
import pandas as pd

df = pd.DataFrame({
    'Transaction': df_pivoted.select("Transaction").toPandas()['Transaction'],
    'Label': clusters.predict(X.rdd.map(lambda x: x[0].toArray())).collect()
})
df.head()


Unnamed: 0,Transaction,Label
0,4821,3
1,9030,0
2,296,1
3,2904,0
4,1572,0


In [11]:
df["Label"].value_counts()

0    4519
1    2728
3    2230
2      54
Name: Label, dtype: int64