<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Objectives" data-toc-modified-id="Objectives-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Objectives</a></span></li><li><span><a href="#Spark:-Getting-Started" data-toc-modified-id="Spark:-Getting-Started-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Spark: Getting Started</a></span><ul class="toc-item"><li><span><a href="#Optional-Step-0:-Prerequisites-&amp;-Installation-for-Databricks-or-Colab" data-toc-modified-id="Optional-Step-0:-Prerequisites-&amp;-Installation-for-Databricks-or-Colab-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Optional Step 0: Prerequisites &amp; Installation for Databricks or Colab</a></span><ul class="toc-item"><li><span><a href="#Databricks-Setup" data-toc-modified-id="Databricks-Setup-2.1.1"><span class="toc-item-num">2.1.1&nbsp;&nbsp;</span>Databricks Setup</a></span></li></ul></li><li><span><a href="#Step-1:-Create-a-SparkSession-with-a-SparkContext" data-toc-modified-id="Step-1:-Create-a-SparkSession-with-a-SparkContext-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Step 1: Create a SparkSession with a SparkContext</a></span></li><li><span><a href="#Step-2:-Download-some-Amazon-reviews-(Toys-&amp;-Games)" data-toc-modified-id="Step-2:-Download-some-Amazon-reviews-(Toys-&amp;-Games)-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Step 2: Download some Amazon reviews (Toys &amp; Games)</a></span><ul class="toc-item"><li><span><a href="#Optional:-For-Databricks-Setup" data-toc-modified-id="Optional:-For-Databricks-Setup-2.3.1"><span class="toc-item-num">2.3.1&nbsp;&nbsp;</span>Optional: For Databricks Setup</a></span></li></ul></li><li><span><a href="#Step-3:-Create-a-Spark-DataFrame" data-toc-modified-id="Step-3:-Create-a-Spark-DataFrame-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Step 3: Create a Spark DataFrame</a></span></li><li><span><a href="#Exploring-the-DataFrame" data-toc-modified-id="Exploring-the-DataFrame-2.5"><span class="toc-item-num">2.5&nbsp;&nbsp;</span>Exploring the DataFrame</a></span><ul class="toc-item"><li><span><a href="#Count-the-Words-in-the-First-Row" data-toc-modified-id="Count-the-Words-in-the-First-Row-2.5.1"><span class="toc-item-num">2.5.1&nbsp;&nbsp;</span>Count the Words in the First Row</a></span></li><li><span><a href="#A-Few-More-Basic-Commands" data-toc-modified-id="A-Few-More-Basic-Commands-2.5.2"><span class="toc-item-num">2.5.2&nbsp;&nbsp;</span>A Few More Basic Commands</a></span></li></ul></li><li><span><a href="#Reading-files" data-toc-modified-id="Reading-files-2.6"><span class="toc-item-num">2.6&nbsp;&nbsp;</span>Reading files</a></span></li></ul></li></ul></div>

<a href="https://colab.research.google.com/github/flatiron-school/ds-spark/blob/main/spark-programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Run for Google Colab environment
!pip install pyspark==3.2.1
!apt install openjdk-8-jdk-headless -qq

In [None]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Objectives

- Use `pyspark` to manipulate data

# Spark: Getting Started

## Optional Step 0: Prerequisites & Installation for Databricks or Local Run

> If you run this notebook in Google Colab (clicking the button at the beginning of this notebook that says "*Open in Colab*") you can skip to [Step 1](#Step-1:-Create-a-SparkSession-with-a-SparkContext)

### Databricks Setup

Follow [these instructions](https://docs.databricks.com/notebooks/notebooks-manage.html#import-a-notebook) to import this notebook into Databricks

For local installation see [here](https://github.com/learn-co-curriculum/dsc-spark-docker-installation).

## Step 1: Create a SparkSession with a SparkContext

In [None]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

## Step 2: Download some Amazon reviews (Toys & Games)

In [None]:
# Get data directly from repo
!wget https://github.com/flatiron-school/ds-spark/releases/download/v1.0/reviews_Toys_and_Games_5.json.gz

### Optional: For Databricks Setup

Follow [these instructions](https://docs.databricks.com/data/data.html#import-data-1) to import `reviews_Toys_and_Games_5.json` into Databricks

## Step 3: Create a Spark DataFrame

In [None]:
# this file path will be different if you are running Spark locally
df = spark.read.json('reviews_Toys_and_Games_5.json.gz')

In [None]:
df.persist()

This last command, `.persist()`, simply stores the DataFrame in memory. See [this page](https://unraveldata.com/to-cache-or-not-to-cache/). It is similar to `.cache()`, but actually more flexible than the latter since you can specify which storage level you want. See [here](https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist).

In [None]:
type(df)

In [None]:
df.show(5) # default of 20 lines

In [None]:
pdf = df.limit(5).toPandas()
pdf

In [None]:
type(pdf)

In [None]:
df.count()

In [None]:
df.columns

In [None]:
df.printSchema()

The 'nullable = true' bit means that the relevant column tolerates null values.

In [None]:
df.describe().show()

In [None]:
df.describe('overall').show()

In [None]:
reviews_df = df[['asin', 'overall']]

In [None]:
reviews_df.show()

In [None]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [None]:
show(reviews_df)

In [None]:
reviews_df.count()

In [None]:
sorted_review_df = reviews_df.sort('overall')

In [None]:
show(sorted_review_df)

In [None]:
counts = reviews_df.agg(F.countDistinct('overall'))

In [None]:
counts.show()

In [None]:
query = """
SELECT overall, COUNT(*)
FROM reviews
GROUP BY overall
ORDER BY overall
"""

In [None]:
reviews_df.createOrReplaceTempView('reviews')

In [None]:
output = spark.sql(query)

In [None]:
show(output)

In [None]:
output.collect()

In [None]:
reviews_df.count() - sum(output.collect()[i][1] for i in range(5))

In [None]:
type(reviews_df)

Convert to RDD!

In [None]:
reviews_df.rdd

In [None]:
type(reviews_df.rdd)

## Exploring the DataFrame

### Count the Words in the First Row

In [None]:
row_one = df.first()

In [None]:
row_one

In [None]:
def word_count(text):
    return len(text.split())

In [None]:
word_count(row_one['reviewText'])

In [None]:
#'udf' is for User Defined Function!

word_count_udf = F.udf(word_count, returnType=IntegerType())

In [None]:
review_text_col = df['reviewText']

In [None]:
counts_df = df.withColumn('wordCount', word_count_udf(review_text_col))

In [None]:
# Remember that we set the default number of lines to show at 5.

show(counts_df).T

In [None]:
word_count_udf = F.udf(word_count, IntegerType())

# Registering our word_count() function so that we
# can use it with SQL! See documentation here:
# https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UDFRegistration.html

df.createOrReplaceTempView('reviews')
spark.udf.register('word_count', word_count_udf)

In [None]:
# Now we can use our function in a SQL query!

query = """
SELECT asin, overall, reviewText, word_count(reviewText) AS wordCount
FROM reviews
"""

In [None]:
counts_df = spark.sql(query)

In [None]:
show(counts_df)

In [None]:
def count_all_the_things(text):
    return [len(text), len(text.split())]

In [None]:
count_udf = F.udf(count_all_the_things, returnType=ArrayType(IntegerType()))

In [None]:
counts_df = df.withColumn('counts', count_udf(df['reviewText']))

In [None]:
show(counts_df, 1)

In [None]:
slim_counts_df = (
    df.drop('reviewTime', 'helpful')
#       .drop('helpful')
      .withColumn('counts', count_udf(df['reviewText']))
      .drop('reviewText')
)

In [None]:
show(slim_counts_df, n=1)

In [None]:
aggs = counts_df.groupBy('reviewerID').agg({'overall': 'mean'})
aggs.collect()

### A Few More Basic Commands

Please refer also to the [official programming guide](http://spark.apache.org/docs/latest/rdd-programming-guide.html).

In [None]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [None]:
def multiply(a, b):
    return a * b

In [None]:
distData.reduce(multiply)

In [None]:
distData.filter(lambda x: x < 4).collect()

## Reading files

```sc.textFile()``` for .txt files

`.toJSON()` for .json files

In [None]:
dfjson = counts_df.toJSON()

In [None]:
df2 = spark.read.json(dfjson)

In [None]:
df2.printSchema()

In [None]:
counts_df

In [None]:
type(df.toPandas())