# Spark: Getting Started

## Step 0: Prerequisites & Installation

Follow [these instructions](https://docs.databricks.com/notebooks/notebooks-manage.html#import-a-notebook) to import this notebook into Databricks

Run these commands in your terminal (just once) if you want to run Spark locally.

 * These instructions require a Mac with [Anaconda3](https://anaconda.com/) and [Homebrew](https://brew.sh/) installed.
 * Useful for small data only. For larger data, try [Databricks](https://databricks.com/).

```bash
# Make Homebrew aware of old versions of casks
brew tap homebrew/cask-versions
(OLD VERSION:) brew tap caskroom/versions

# Install Java 1.8 (OpenJDK 8)
brew cask install homebrew/cask-versions/adoptopenjdk8
OR brew cask install caskroom/versions/adoptopenjdk8
(OLD VERSION:) brew cask install adoptopenjdk8

# Install the current version of Spark
brew install apache-spark

# Install Py4J (connects PySpark to the Java Virtual Machine)
pip install py4j

# Add JAVA_HOME to .bash_profile (makes Java 1.8 your default JVM)
echo "export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)" >> ~/.bash_profile

# Add SPARK_HOME to .bash_profile
echo "export SPARK_HOME=/usr/local/Cellar/apache-spark/3.0.1/libexec" >> ~/.bash_profile

# Add PySpark to PYTHONPATH in .bash_profile
echo "export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH" >> ~/.bash_profile

# Update current environment
source ~/.bash_profile

```

## Step 1: Create a SparkSession with a SparkContext

In [0]:
!echo $JAVA_HOME

In [0]:
#!brew upgrade apache-spark

In [0]:
#!pip install py4j

In [0]:
#!pip install pyspark

In [0]:
import pyspark

# un-comment the following lines if you are running Spark locally
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
spark

In [0]:
sc

## Step 2: Download some Amazon reviews (Toys & Games)

In [0]:
# Data is already in the repo, but you can also get it with
# !wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
# !gunzip reviews_Toys_and_Games_5.json.gz

Follow [these instructions](https://docs.databricks.com/data/data.html#import-data-1) to import `reviews_Toys_and_Games_5.json` into Databricks

## Step 3: Create a Spark DataFrame

In [0]:
# this file path will be different if you are running Spark locally
df = spark.read.json('/FileStore/tables/reviews_Toys_and_Games_5.json')

In [0]:
df.persist()

This last command, `.persist()`, simply stores the DataFrame in memory. See [this page](https://unraveldata.com/to-cache-or-not-to-cache/). It is similar to `.cache()`, but actually more flexible than the latter since you can specify which storage level you want. See [here](https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist).

In [0]:
type(df)

In [0]:
df.show(5) # default of 20 lines

In [0]:
pdf = df.limit(5).toPandas()
pdf

In [0]:
type(pdf)

In [0]:
df.count()

In [0]:
df.columns

In [0]:
df.printSchema()

The 'nullable = true' bit means that the relevant column tolerates null values.

In [0]:
df.describe().show()

In [0]:
df.describe('overall').show()

In [0]:
reviews_df = df[['asin', 'overall']]

In [0]:
reviews_df.show()

In [0]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [0]:
show(reviews_df)

In [0]:
reviews_df.count()

In [0]:
sorted_review_df = reviews_df.sort('overall')

In [0]:
show(sorted_review_df)

In [0]:
import pyspark.sql.functions as F

In [0]:
counts = reviews_df.agg(F.countDistinct('overall'))

In [0]:
counts.show()

In [0]:
query = """
SELECT overall, COUNT(*)
FROM reviews
GROUP BY overall
ORDER BY overall
"""

In [0]:
reviews_df.createOrReplaceTempView('reviews')

In [0]:
output = spark.sql(query)

In [0]:
show(output)

In [0]:
output.collect()

In [0]:
reviews_df.count() - sum(output.collect()[i][1] for i in range(5))

In [0]:
type(reviews_df)

Convert to RDD!

In [0]:
reviews_df.rdd

In [0]:
type(reviews_df.rdd)

### Count the words in the first row

In [0]:
row_one = df.first()

In [0]:
row_one

In [0]:
def word_count(text):
    return len(text.split())

In [0]:
word_count(row_one['reviewText'])

In [0]:
from pyspark.sql.types import IntegerType

#'udf' is for User Defined Function!

word_count_udf = F.udf(word_count, returnType=IntegerType())

In [0]:
review_text_col = df['reviewText']

In [0]:
counts_df = df.withColumn('wordCount', word_count_udf(review_text_col))

In [0]:
# Remember that we set the default number of lines to show at 5.

show(counts_df).T

In [0]:
from pyspark.sql.types import IntegerType
word_count_udf = F.udf(word_count, IntegerType())

# Registering our word_count() function so that we
# can use it with SQL! See documentation here:
# https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UDFRegistration.html

df.createOrReplaceTempView('reviews')
spark.udf.register('word_count', word_count_udf)

In [0]:
# Now we can use our function in a SQL query!

In [0]:
query = """
SELECT asin, overall, reviewText, word_count(reviewText) AS wordCount
FROM reviews
"""

In [0]:
counts_df = spark.sql(query)

In [0]:
show(counts_df)

In [0]:
def count_all_the_things(text):
    return [len(text), len(text.split())]

In [0]:
from pyspark.sql.types import ArrayType, IntegerType
count_udf = F.udf(count_all_the_things, returnType=ArrayType(IntegerType()))

In [0]:
counts_df = df.withColumn('counts', count_udf(df['reviewText']))

In [0]:
show(counts_df, 1)

In [0]:
slim_counts_df = (
    df.drop('reviewTime', 'helpful')
#       .drop('helpful')
      .withColumn('counts', count_udf(df['reviewText']))
      .drop('reviewText')
)

In [0]:
show(slim_counts_df, n=1)

In [0]:
aggs = counts_df.groupBy('reviewerID').agg({'overall': 'mean'})
aggs.collect()

### A few more basic commands

Please refer also to the [official programming guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html).

In [0]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [0]:
distData.

In [0]:
def multiply(a, b):
    return a * b

In [0]:
distData.reduce(multiply)

In [0]:
distData.filter(lambda x: x < 4).collect()

### Reading files

```sc.textFile()``` for .txt files

`.toJSON()` for .json files

In [0]:
dfjson = counts_df.toJSON()

In [0]:
df2 = spark.read.json(dfjson)

In [0]:
df2.printSchema()

In [0]:
counts_df

In [0]:
type(df.toPandas())