# Apache Spark Intro

**Important**: You have to run this notebook using `pyspark` command.

1. Create the `SparkSession`
2. Open spark dashboard in  ["http://localhost:4040"]("http://localhost:4040") 

https://spark.apache.org/docs/latest/monitoring.html

Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:

A list of scheduler stages and tasks
A summary of RDD sizes and memory usage
Environmental information.
Information about the running executors
You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

In [None]:
# Prepare a SparkContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [None]:
# Open Spark Dashboard in browser tab
import webbrowser
webbrowser.open("http://localhost:4040")

# Estimate PI 

In [None]:
import random
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 10000
count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly {}".format(4.0 * count / NUM_SAMPLES))

# Some Spark commands

In [None]:
# Loading a CSV
df = spark.read.format("csv").option("header", "true").load("./data/breadbasket_dms.csv")
df.show()

In [None]:
# Grouping operation with distinct count
from pyspark.sql.functions import collect_list, approx_count_distinct
q = df.groupby(df.Transaction).agg(collect_list("Item"), approx_count_distinct("Item"))
print(q.show())
print(q.head())

In [None]:
from pyspark.sql import functions as F
df_pivoted = df.groupBy("Transaction").pivot("Item").agg(F.lit(1)).na.fill(0)
df_pivoted.select(df_pivoted.columns[:7]).show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType

df_pivoted = df_pivoted.withColumn("Transaction", df_pivoted["Transaction"].cast(IntegerType()))
print(df_pivoted.columns[:])
print(len(df_pivoted.columns))

vecAssembler = VectorAssembler(inputCols=df_pivoted.columns[3:], outputCol="Features")
new_df = vecAssembler.transform(df_pivoted)
X = new_df.select('Features')
X.head(5)

In [None]:
from pyspark.mllib.clustering import KMeans

num_clusters = 4
clusters = KMeans.train(X.rdd.map(lambda x: x[0].toArray()), num_clusters, maxIterations=15, initializationMode="random")


In [None]:
import pandas as pd

df = pd.DataFrame({
    'Transaction': df_pivoted.select("Transaction").toPandas()['Transaction'],
    'Label': clusters.predict(X.rdd.map(lambda x: x[0].toArray())).collect()
})
df.head()


In [None]:
df["Label"].value_counts()